Browse Source

KAFKA-14499: [2/N] Add OffsetCommit record & related (#14047)

This patch does a few things:
1) It introduces the `OffsetAndMetadata` class which hold the committed offsets in the group coordinator.
2) It adds methods to deal with OffsetCommit records to `RecordHelpers`.
3) It adds `MetadataVersion#offsetCommitValueVersion` to get the version of the OffsetCommit value record that should be used.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Arthur <mumrah@gmail.com>, Justine Olshan <jolshan@confluent.io>
pull/14069/head
David Jacot 1 year ago committed by GitHub
parent
commit
2528dd4116
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      checkstyle/import-control.xml
  2. 125
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
  3. 69
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
  4. 74
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
  5. 108
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
  6. 12
      server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
  7. 20
      server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java

1
checkstyle/import-control.xml

@ -229,6 +229,7 @@ @@ -229,6 +229,7 @@
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.deferred" />

125
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java

@ -0,0 +1,125 @@ @@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.OptionalLong;
/**
* Represents a committed offset with its metadata.
*/
public class OffsetAndMetadata {
public static final String NO_METADATA = "";
/**
* The committed offset.
*/
public final long offset;
/**
* The leader epoch in use when the offset was committed.
*/
public final OptionalInt leaderEpoch;
/**
* The committed metadata. The Kafka offset commit API allows users to provide additional
* metadata (in the form of a string) when an offset is committed. This can be useful
* (for example) to store information about which node made the commit, what time the
* commit was made, etc.
*/
public final String metadata;
/**
* The commit timestamp in milliseconds.
*/
public final long commitTimestampMs;
/**
* The expire timestamp in milliseconds.
*/
public final OptionalLong expireTimestampMs;
public OffsetAndMetadata(
long offset,
OptionalInt leaderEpoch,
String metadata,
long commitTimestampMs,
OptionalLong expireTimestampMs
) {
this.offset = offset;
this.leaderEpoch = Objects.requireNonNull(leaderEpoch);
this.metadata = Objects.requireNonNull(metadata);
this.commitTimestampMs = commitTimestampMs;
this.expireTimestampMs = Objects.requireNonNull(expireTimestampMs);
}
@Override
public String toString() {
return "OffsetAndMetadata(offset=" + offset +
", leaderEpoch=" + leaderEpoch +
", metadata=" + metadata +
", commitTimestampMs=" + commitTimestampMs +
", expireTimestampMs=" + expireTimestampMs +
')';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OffsetAndMetadata that = (OffsetAndMetadata) o;
if (offset != that.offset) return false;
if (commitTimestampMs != that.commitTimestampMs) return false;
if (!leaderEpoch.equals(that.leaderEpoch)) return false;
if (!metadata.equals(that.metadata)) return false;
return expireTimestampMs.equals(that.expireTimestampMs);
}
@Override
public int hashCode() {
int result = (int) (offset ^ (offset >>> 32));
result = 31 * result + leaderEpoch.hashCode();
result = 31 * result + metadata.hashCode();
result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs >>> 32));
result = 31 * result + expireTimestampMs.hashCode();
return result;
}
/**
* @return An OffsetAndMetadata created from a OffsetCommitValue record.
*/
public static OffsetAndMetadata fromRecord(
OffsetCommitValue record
) {
return new OffsetAndMetadata(
record.offset(),
record.leaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
OptionalInt.empty() : OptionalInt.of(record.leaderEpoch()),
record.metadata(),
record.commitTimestamp(),
record.expireTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ?
OptionalLong.empty() : OptionalLong.of(record.expireTimestamp())
);
}
}

69
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
@ -33,6 +35,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen @@ -33,6 +35,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@ -467,6 +471,71 @@ public class RecordHelpers { @@ -467,6 +471,71 @@ public class RecordHelpers {
);
}
/**
* Creates an OffsetCommit record.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partitionId The partition id.
* @param offsetAndMetadata The offset and metadata.
* @param metadataVersion The metadata version.
* @return The record.
*/
public static Record newOffsetCommitRecord(
String groupId,
String topic,
int partitionId,
OffsetAndMetadata offsetAndMetadata,
MetadataVersion metadataVersion
) {
short version = metadataVersion.offsetCommitValueVersion(offsetAndMetadata.expireTimestampMs.isPresent());
return new Record(
new ApiMessageAndVersion(
new OffsetCommitKey()
.setGroup(groupId)
.setTopic(topic)
.setPartition(partitionId),
(short) 1
),
new ApiMessageAndVersion(
new OffsetCommitValue()
.setOffset(offsetAndMetadata.offset)
.setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setMetadata(offsetAndMetadata.metadata)
.setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
// Version 1 has a non-empty expireTimestamp field
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)),
version
)
);
}
/**
* Creates an OffsetCommit tombstone record.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partitionId The partition id.
* @return The record.
*/
public static Record newOffsetCommitTombstoneRecord(
String groupId,
String topic,
int partitionId
) {
return new Record(
new ApiMessageAndVersion(
new OffsetCommitKey()
.setGroup(groupId)
.setTopic(topic)
.setPartition(partitionId),
(short) 1
),
null
);
}
private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions
) {

74
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java

@ -0,0 +1,74 @@ @@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.junit.jupiter.api.Test;
import java.util.OptionalInt;
import java.util.OptionalLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class OffsetAndMetadataTest {
@Test
public void testAttributes() {
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
100L,
OptionalInt.of(10),
"metadata",
1234L,
OptionalLong.of(5678L)
);
assertEquals(100L, offsetAndMetadata.offset);
assertEquals(OptionalInt.of(10), offsetAndMetadata.leaderEpoch);
assertEquals("metadata", offsetAndMetadata.metadata);
assertEquals(1234L, offsetAndMetadata.commitTimestampMs);
assertEquals(OptionalLong.of(5678L), offsetAndMetadata.expireTimestampMs);
}
@Test
public void testFromRecord() {
OffsetCommitValue record = new OffsetCommitValue()
.setOffset(100L)
.setLeaderEpoch(-1)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
.setExpireTimestamp(-1L);
assertEquals(new OffsetAndMetadata(
100L,
OptionalInt.empty(),
"metadata",
1234L,
OptionalLong.empty()
), OffsetAndMetadata.fromRecord(record));
record
.setLeaderEpoch(12)
.setExpireTimestamp(5678L);
assertEquals(new OffsetAndMetadata(
100L,
OptionalInt.of(12),
"metadata",
1234L,
OptionalLong.of(5678L)
), OffsetAndMetadata.fromRecord(record));
}
}

108
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java

@ -40,6 +40,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen @@ -40,6 +40,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
@ -48,6 +50,7 @@ import org.apache.kafka.server.common.MetadataVersion; @@ -48,6 +50,7 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
@ -59,6 +62,8 @@ import java.util.LinkedHashMap; @@ -59,6 +62,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Stream;
@ -648,4 +653,107 @@ public class RecordHelpersTest { @@ -648,4 +653,107 @@ public class RecordHelpersTest {
assertEquals(expectedRecord, groupMetadataRecord);
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testNewOffsetCommitRecord(MetadataVersion metadataVersion) {
OffsetCommitKey key = new OffsetCommitKey()
.setGroup("group-id")
.setTopic("foo")
.setPartition(1);
OffsetCommitValue value = new OffsetCommitValue()
.setOffset(100L)
.setLeaderEpoch(10)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
.setExpireTimestamp(-1L);
Record expectedRecord = new Record(
new ApiMessageAndVersion(
key,
(short) 1),
new ApiMessageAndVersion(
value,
metadataVersion.offsetCommitValueVersion(false)
)
);
assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
"group-id",
"foo",
1,
new OffsetAndMetadata(
100L,
OptionalInt.of(10),
"metadata",
1234L,
OptionalLong.empty()),
metadataVersion
));
value.setLeaderEpoch(-1);
assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
"group-id",
"foo",
1,
new OffsetAndMetadata(
100L,
OptionalInt.empty(),
"metadata",
1234L,
OptionalLong.empty()),
metadataVersion
));
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testNewOffsetCommitRecordWithExpireTimestamp(MetadataVersion metadataVersion) {
Record expectedRecord = new Record(
new ApiMessageAndVersion(
new OffsetCommitKey()
.setGroup("group-id")
.setTopic("foo")
.setPartition(1),
(short) 1),
new ApiMessageAndVersion(
new OffsetCommitValue()
.setOffset(100L)
.setLeaderEpoch(10)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
.setExpireTimestamp(5678L),
(short) 1 // When expire timestamp is set, it is always version 1.
)
);
assertEquals(expectedRecord, RecordHelpers.newOffsetCommitRecord(
"group-id",
"foo",
1,
new OffsetAndMetadata(
100L,
OptionalInt.of(10),
"metadata",
1234L,
OptionalLong.of(5678L)),
metadataVersion
));
}
@Test
public void testNewOffsetCommitTombstoneRecord() {
Record expectedRecord = new Record(
new ApiMessageAndVersion(
new OffsetCommitKey()
.setGroup("group-id")
.setTopic("foo")
.setPartition(1),
(short) 1),
null);
Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
assertEquals(expectedRecord, record);
}
}

12
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

@ -386,6 +386,18 @@ public enum MetadataVersion { @@ -386,6 +386,18 @@ public enum MetadataVersion {
}
}
public short offsetCommitValueVersion(boolean expireTimestampMs) {
if (isLessThan(MetadataVersion.IBP_2_1_IV0) || expireTimestampMs) {
return 1;
} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) {
return 2;
} else {
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
return 3;
}
}
private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{

20
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java

@ -328,4 +328,24 @@ class MetadataVersionTest { @@ -328,4 +328,24 @@ class MetadataVersionTest {
}
assertEquals(expectedVersion, metadataVersion.groupMetadataValueVersion());
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testOffsetCommitValueVersion(MetadataVersion metadataVersion) {
final short expectedVersion;
if (metadataVersion.isAtLeast(MetadataVersion.IBP_2_1_IV1)) {
expectedVersion = 3;
} else if (metadataVersion.isAtLeast(IBP_2_1_IV0)) {
expectedVersion = 2;
} else {
expectedVersion = 1;
}
assertEquals(expectedVersion, metadataVersion.offsetCommitValueVersion(false));
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testOffsetCommitValueVersionWithExpiredTimestamp(MetadataVersion metadataVersion) {
assertEquals((short) 1, metadataVersion.offsetCommitValueVersion(true));
}
}

Loading…
Cancel
Save