Browse Source

KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. (#10271)

KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol message format.
Added tests for all the event types for that topic.

This is part of the tiered storaqe implementation KIP-405.

Reivewers:  Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
pull/10644/head
Satish Duggana 4 years ago committed by GitHub
parent
commit
a1367f57f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      build.gradle
  2. 4
      checkstyle/import-control.xml
  3. 2
      checkstyle/suppressions.xml
  4. 2
      raft/src/main/java/org/apache/kafka/raft/RecordSerde.java
  5. 93
      raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java
  6. 44
      raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java
  7. 56
      storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadata.java
  8. 49
      storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
  9. 43
      storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
  10. 50
      storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteMetadata.java
  11. 68
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java
  12. 104
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
  13. 52
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataTransform.java
  14. 98
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
  15. 59
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
  16. 54
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java
  17. 126
      storage/src/main/resources/message/RemoteLogSegmentMetadata.json
  18. 82
      storage/src/main/resources/message/RemoteLogSegmentMetadataUpdate.json
  19. 68
      storage/src/main/resources/message/RemotePartitionDeleteMetadata.json
  20. 111
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
  21. 85
      storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java

20
build.gradle

@ -1411,6 +1411,8 @@ project(':storage') { @@ -1411,6 +1411,8 @@ project(':storage') {
dependencies {
implementation project(':storage:api')
implementation project(':clients')
implementation project(':metadata')
implementation project(':raft')
implementation libs.slf4jApi
implementation libs.jacksonDatabind
@ -1438,19 +1440,33 @@ project(':storage') { @@ -1438,19 +1440,33 @@ project(':storage') {
}
}
task processMessages(type:JavaExec) {
main = "org.apache.kafka.message.MessageGenerator"
classpath = project(':generator').sourceSets.main.runtimeClasspath
args = [ "-p", " org.apache.kafka.server.log.remote.metadata.storage.generated",
"-o", "src/generated/java/org/apache/kafka/server/log/remote/metadata/storage/generated",
"-i", "src/main/resources/message",
"-m", "MessageDataGenerator", "JsonConverterGenerator",
"-t", "MetadataRecordTypeGenerator", "MetadataJsonConvertersGenerator" ]
inputs.dir("src/main/resources/message")
outputs.dir("src/generated/java/org/apache/kafka/server/log/remote/metadata/storage/generated")
}
sourceSets {
main {
java {
srcDirs = ["src/main/java"]
srcDirs = ["src/generated/java", "src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
srcDirs = ["src/generated/java", "src/test/java"]
}
}
}
compileJava.dependsOn 'processMessages'
jar {
dependsOn createVersionFile
from("$buildDir") {

4
checkstyle/import-control.xml

@ -267,11 +267,13 @@ @@ -267,11 +267,13 @@
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="com.fasterxml.jackson" />
<subpackage name="log">
<allow pkg="org.apache.kafka.server.log" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.raft.metadata" />
</subpackage>
</subpackage>
<subpackage name="shell">

2
checkstyle/suppressions.xml

@ -184,6 +184,8 @@ @@ -184,6 +184,8 @@
files="streams[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="raft[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="storage[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="ImportControl" files="FetchResponseData.java"/>
<suppress checks="ImportControl" files="RecordsSerdeTest.java"/>

2
raft/src/main/java/org/apache/kafka/raft/RecordSerde.java

@ -21,7 +21,7 @@ import org.apache.kafka.common.protocol.Readable; @@ -21,7 +21,7 @@ import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.protocol.Writable;
/**
* Serde interface for records written to the Raft log. This class assumes
* Serde interface for records written to a metadata log. This class assumes
* a two-pass serialization, with the first pass used to compute the size of the
* serialized record, and the second pass to write the object.
*/

93
raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java

@ -0,0 +1,93 @@ @@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.metadata;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.raft.RecordSerde;
/**
* This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement
* {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for the given {@code apiKey}.
*
* This can be used as the underlying serialization mechanism for records defined with {@link ApiMessage}s.
* <p></p>
* Serialization format for the given {@code ApiMessageAndVersion} is below:
* <p></p>
* <pre>
* [data_frame_version header message]
* header => [api_key version]
*
* data_frame_version : This is the header version, current value is 0. Header includes both api_key and version.
* api_key : apiKey of {@code ApiMessageAndVersion} object.
* version : version of {@code ApiMessageAndVersion} object.
* message : serialized message of {@code ApiMessageAndVersion} object.
* </pre>
*/
public abstract class AbstractApiMessageSerde implements RecordSerde<ApiMessageAndVersion> {
private static final short DEFAULT_FRAME_VERSION = 0;
private static final int DEFAULT_FRAME_VERSION_SIZE = ByteUtils.sizeOfUnsignedVarint(DEFAULT_FRAME_VERSION);
@Override
public int recordSize(ApiMessageAndVersion data,
ObjectSerializationCache serializationCache) {
int size = DEFAULT_FRAME_VERSION_SIZE;
size += ByteUtils.sizeOfUnsignedVarint(data.message().apiKey());
size += ByteUtils.sizeOfUnsignedVarint(data.version());
size += data.message().size(serializationCache, data.version());
return size;
}
@Override
public void write(ApiMessageAndVersion data,
ObjectSerializationCache serializationCache,
Writable out) {
out.writeUnsignedVarint(DEFAULT_FRAME_VERSION);
out.writeUnsignedVarint(data.message().apiKey());
out.writeUnsignedVarint(data.version());
data.message().write(out, serializationCache, data.version());
}
@Override
public ApiMessageAndVersion read(Readable input,
int size) {
short frameVersion = (short) input.readUnsignedVarint();
if (frameVersion != DEFAULT_FRAME_VERSION) {
throw new SerializationException("Could not deserialize metadata record due to unknown frame version "
+ frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)");
}
short apiKey = (short) input.readUnsignedVarint();
short version = (short) input.readUnsignedVarint();
ApiMessage record = apiMessageFor(apiKey);
record.read(input, version);
return new ApiMessageAndVersion(record, version);
}
/**
* Return {@code ApiMessage} instance for the given {@code apiKey}. This is used while deserializing the bytes
* payload into the respective {@code ApiMessage} in {@link #read(Readable, int)} method.
*
* @param apiKey apiKey for which a {@code ApiMessage} to be created.
*/
public abstract ApiMessage apiMessageFor(short apiKey);
}

44
raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java

@ -16,51 +16,13 @@ @@ -16,51 +16,13 @@
*/
package org.apache.kafka.raft.metadata;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.raft.RecordSerde;
public class MetadataRecordSerde implements RecordSerde<ApiMessageAndVersion> {
private static final short DEFAULT_FRAME_VERSION = 0;
private static final int DEFAULT_FRAME_VERSION_SIZE = ByteUtils.sizeOfUnsignedVarint(DEFAULT_FRAME_VERSION);
public class MetadataRecordSerde extends AbstractApiMessageSerde {
@Override
public int recordSize(ApiMessageAndVersion data, ObjectSerializationCache serializationCache) {
int size = DEFAULT_FRAME_VERSION_SIZE;
size += ByteUtils.sizeOfUnsignedVarint(data.message().apiKey());
size += ByteUtils.sizeOfUnsignedVarint(data.version());
size += data.message().size(serializationCache, data.version());
return size;
public ApiMessage apiMessageFor(short apiKey) {
return MetadataRecordType.fromId(apiKey).newMetadataRecord();
}
@Override
public void write(ApiMessageAndVersion data, ObjectSerializationCache serializationCache, Writable out) {
out.writeUnsignedVarint(DEFAULT_FRAME_VERSION);
out.writeUnsignedVarint(data.message().apiKey());
out.writeUnsignedVarint(data.version());
data.message().write(out, serializationCache, data.version());
}
@Override
public ApiMessageAndVersion read(Readable input, int size) {
short frameVersion = (short) input.readUnsignedVarint();
if (frameVersion != DEFAULT_FRAME_VERSION) {
throw new SerializationException("Could not deserialize metadata record due to unknown frame version "
+ frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)");
}
short apiKey = (short) input.readUnsignedVarint();
short version = (short) input.readUnsignedVarint();
MetadataRecordType recordType = MetadataRecordType.fromId(apiKey);
ApiMessage record = recordType.newMetadataRecord();
record.read(input, version);
return new ApiMessageAndVersion(record, version);
}
}

56
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadata.java

@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Base class for remote log metadata objects like {@link RemoteLogSegmentMetadata}, {@link RemoteLogSegmentMetadataUpdate},
* and {@link RemotePartitionDeleteMetadata}.
*/
@InterfaceStability.Evolving
public abstract class RemoteLogMetadata {
/**
* Broker id from which this event is generated.
*/
private final int brokerId;
/**
* Epoch time in milli seconds at which this event is generated.
*/
private final long eventTimestampMs;
protected RemoteLogMetadata(int brokerId, long eventTimestampMs) {
this.brokerId = brokerId;
this.eventTimestampMs = eventTimestampMs;
}
/**
* @return Epoch time in milli seconds at which this event is occurred.
*/
public long eventTimestampMs() {
return eventTimestampMs;
}
/**
* @return Broker id from which this event is generated.
*/
public int brokerId() {
return brokerId;
}
}

49
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java

@ -33,7 +33,7 @@ import java.util.TreeMap; @@ -33,7 +33,7 @@ import java.util.TreeMap;
* {@code RemoteLogSegmentMetadata}.
*/
@InterfaceStability.Evolving
public class RemoteLogSegmentMetadata {
public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
/**
* Universally unique remote log segment id.
@ -50,21 +50,11 @@ public class RemoteLogSegmentMetadata { @@ -50,21 +50,11 @@ public class RemoteLogSegmentMetadata {
*/
private final long endOffset;
/**
* Broker id from which this event is generated.
*/
private final int brokerId;
/**
* Maximum timestamp in milli seconds in the segment
*/
private final long maxTimestampMs;
/**
* Epoch time in milli seconds at which the respective {@link #state} is set.
*/
private final long eventTimestampMs;
/**
* LeaderEpoch vs offset for messages within this segment.
*/
@ -105,14 +95,13 @@ public class RemoteLogSegmentMetadata { @@ -105,14 +95,13 @@ public class RemoteLogSegmentMetadata {
int segmentSizeInBytes,
RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs) {
super(brokerId, eventTimestampMs);
this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null");
this.state = Objects.requireNonNull(state, "state can not be null");
this.startOffset = startOffset;
this.endOffset = endOffset;
this.brokerId = brokerId;
this.maxTimestampMs = maxTimestampMs;
this.eventTimestampMs = eventTimestampMs;
this.segmentSizeInBytes = segmentSizeInBytes;
if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
@ -177,13 +166,6 @@ public class RemoteLogSegmentMetadata { @@ -177,13 +166,6 @@ public class RemoteLogSegmentMetadata {
return endOffset;
}
/**
* @return Epoch time in milli seconds at which this event is occurred.
*/
public long eventTimestampMs() {
return eventTimestampMs;
}
/**
* @return Total size of this segment in bytes.
*/
@ -205,13 +187,6 @@ public class RemoteLogSegmentMetadata { @@ -205,13 +187,6 @@ public class RemoteLogSegmentMetadata {
return segmentLeaderEpochs;
}
/**
* @return Broker id from which this event is generated.
*/
public int brokerId() {
return brokerId;
}
/**
* Returns the current state of this remote log segment. It can be any of the below
* <ul>
@ -251,17 +226,19 @@ public class RemoteLogSegmentMetadata { @@ -251,17 +226,19 @@ public class RemoteLogSegmentMetadata {
return false;
}
RemoteLogSegmentMetadata that = (RemoteLogSegmentMetadata) o;
return startOffset == that.startOffset && endOffset == that.endOffset && brokerId == that.brokerId
&& maxTimestampMs == that.maxTimestampMs && eventTimestampMs == that.eventTimestampMs
&& segmentSizeInBytes == that.segmentSizeInBytes
&& Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId)
&& Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == that.state;
return startOffset == that.startOffset && endOffset == that.endOffset
&& maxTimestampMs == that.maxTimestampMs
&& segmentSizeInBytes == that.segmentSizeInBytes
&& Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId)
&& Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == that.state
&& eventTimestampMs() == that.eventTimestampMs()
&& brokerId() == that.brokerId();
}
@Override
public int hashCode() {
return Objects.hash(remoteLogSegmentId, startOffset, endOffset, brokerId, maxTimestampMs,
eventTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, state);
return Objects.hash(remoteLogSegmentId, startOffset, endOffset, brokerId(), maxTimestampMs,
eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, state);
}
@Override
@ -270,9 +247,9 @@ public class RemoteLogSegmentMetadata { @@ -270,9 +247,9 @@ public class RemoteLogSegmentMetadata {
"remoteLogSegmentId=" + remoteLogSegmentId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", brokerId=" + brokerId +
", brokerId=" + brokerId() +
", maxTimestampMs=" + maxTimestampMs +
", eventTimestampMs=" + eventTimestampMs +
", eventTimestampMs=" + eventTimestampMs() +
", segmentLeaderEpochs=" + segmentLeaderEpochs +
", segmentSizeInBytes=" + segmentSizeInBytes +
", state=" + state +

43
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java

@ -26,28 +26,18 @@ import java.util.Objects; @@ -26,28 +26,18 @@ import java.util.Objects;
* This also includes the timestamp of this event.
*/
@InterfaceStability.Evolving
public class RemoteLogSegmentMetadataUpdate {
public class RemoteLogSegmentMetadataUpdate extends RemoteLogMetadata {
/**
* Universally unique remote log segment id.
*/
private final RemoteLogSegmentId remoteLogSegmentId;
/**
* Epoch time in milli seconds at which this event is generated.
*/
private final long eventTimestampMs;
/**
* It indicates the state in which the action is executed on this segment.
*/
private final RemoteLogSegmentState state;
/**
* Broker id from which this event is generated.
*/
private final int brokerId;
/**
* @param remoteLogSegmentId Universally unique remote log segment id.
* @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
@ -56,10 +46,9 @@ public class RemoteLogSegmentMetadataUpdate { @@ -56,10 +46,9 @@ public class RemoteLogSegmentMetadataUpdate {
*/
public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId remoteLogSegmentId, long eventTimestampMs,
RemoteLogSegmentState state, int brokerId) {
super(brokerId, eventTimestampMs);
this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null");
this.state = Objects.requireNonNull(state, "state can not be null");
this.brokerId = brokerId;
this.eventTimestampMs = eventTimestampMs;
}
/**
@ -69,13 +58,6 @@ public class RemoteLogSegmentMetadataUpdate { @@ -69,13 +58,6 @@ public class RemoteLogSegmentMetadataUpdate {
return remoteLogSegmentId;
}
/**
* @return Epoch time in milli seconds at which this event is generated.
*/
public long eventTimestampMs() {
return eventTimestampMs;
}
/**
* It represents the state of the remote log segment. It can be one of the values of {@link RemoteLogSegmentState}.
*/
@ -83,13 +65,6 @@ public class RemoteLogSegmentMetadataUpdate { @@ -83,13 +65,6 @@ public class RemoteLogSegmentMetadataUpdate {
return state;
}
/**
* @return Broker id from which this event is generated.
*/
public int brokerId() {
return brokerId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -99,24 +74,24 @@ public class RemoteLogSegmentMetadataUpdate { @@ -99,24 +74,24 @@ public class RemoteLogSegmentMetadataUpdate {
return false;
}
RemoteLogSegmentMetadataUpdate that = (RemoteLogSegmentMetadataUpdate) o;
return eventTimestampMs == that.eventTimestampMs &&
Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId) &&
return Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId) &&
state == that.state &&
brokerId == that.brokerId;
eventTimestampMs() == that.eventTimestampMs() &&
brokerId() == that.brokerId();
}
@Override
public int hashCode() {
return Objects.hash(remoteLogSegmentId, eventTimestampMs, state, brokerId);
return Objects.hash(remoteLogSegmentId, state, eventTimestampMs(), brokerId());
}
@Override
public String toString() {
return "RemoteLogSegmentMetadataUpdate{" +
"remoteLogSegmentId=" + remoteLogSegmentId +
", eventTimestampMs=" + eventTimestampMs +
", state=" + state +
", brokerId=" + brokerId +
", state=" + state +
", eventTimestampMs=" + eventTimestampMs() +
", brokerId=" + brokerId() +
'}';
}
}

50
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemotePartitionDeleteMetadata.java

@ -22,32 +22,30 @@ import org.apache.kafka.common.annotation.InterfaceStability; @@ -22,32 +22,30 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Objects;
/**
* This class represents the metadata about the remote partition. It can be updated with {@link RemoteLogMetadataManager#putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata)}.
* This class represents the metadata about the remote partition. It can be created/updated with {@link RemoteLogMetadataManager#putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata)}.
* Possible state transitions are mentioned at {@link RemotePartitionDeleteState}.
*/
@InterfaceStability.Evolving
public class RemotePartitionDeleteMetadata {
public class RemotePartitionDeleteMetadata extends RemoteLogMetadata {
private final TopicIdPartition topicIdPartition;
private final RemotePartitionDeleteState state;
private final long eventTimestamp;
private final int brokerId;
/**
* Creates an instance of this class with the given metadata.
*
* @param topicIdPartition
* @param state
* @param eventTimestamp
* @param brokerId
* @param topicIdPartition topic partition for which this event is meant for.
* @param state State of the remote topic partition.
* @param eventTimestampMs Epoch time in milli seconds at which this event is occurred.
* @param brokerId Id of the broker in which this event is raised.
*/
public RemotePartitionDeleteMetadata(TopicIdPartition topicIdPartition,
RemotePartitionDeleteState state,
long eventTimestamp,
long eventTimestampMs,
int brokerId) {
super(brokerId, eventTimestampMs);
this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
this.state = Objects.requireNonNull(state);
this.eventTimestamp = eventTimestamp;
this.brokerId = brokerId;
}
/**
@ -64,27 +62,13 @@ public class RemotePartitionDeleteMetadata { @@ -64,27 +62,13 @@ public class RemotePartitionDeleteMetadata {
return state;
}
/**
* @return Epoch time at which this event is occurred.
*/
public long eventTimestamp() {
return eventTimestamp;
}
/**
* @return broker id from which this event is generated.
*/
public int brokerId() {
return brokerId;
}
@Override
public String toString() {
return "RemotePartitionDeleteMetadata{" +
"topicPartition=" + topicIdPartition +
", state=" + state +
", eventTimestamp=" + eventTimestamp +
", brokerId=" + brokerId +
", eventTimestampMs=" + eventTimestampMs() +
", brokerId=" + brokerId() +
'}';
}
@ -97,14 +81,14 @@ public class RemotePartitionDeleteMetadata { @@ -97,14 +81,14 @@ public class RemotePartitionDeleteMetadata {
return false;
}
RemotePartitionDeleteMetadata that = (RemotePartitionDeleteMetadata) o;
return eventTimestamp == that.eventTimestamp &&
brokerId == that.brokerId &&
Objects.equals(topicIdPartition, that.topicIdPartition) &&
state == that.state;
return Objects.equals(topicIdPartition, that.topicIdPartition) &&
state == that.state &&
eventTimestampMs() == that.eventTimestampMs() &&
brokerId() == that.brokerId();
}
@Override
public int hashCode() {
return Objects.hash(topicIdPartition, state, eventTimestamp, brokerId);
return Objects.hash(topicIdPartition, state, eventTimestampMs(), brokerId());
}
}
}

68
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java

@ -0,0 +1,68 @@ @@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage.serialization;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.raft.metadata.AbstractApiMessageSerde;
import java.nio.ByteBuffer;
/**
* This class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa.. This can be used as serialization protocol for any
* metadata records derived of {@code ApiMessage}s. It internally uses {@link AbstractApiMessageSerde} for serialization/deserialization
* mechanism.
* <p></p>
* Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective
* {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective
* {@code ApiMessage} instance.
*/
public abstract class BytesApiMessageSerde {
private final AbstractApiMessageSerde metadataRecordSerde = new AbstractApiMessageSerde() {
@Override
public ApiMessage apiMessageFor(short apiKey) {
return BytesApiMessageSerde.this.apiMessageFor(apiKey);
}
};
public byte[] serialize(ApiMessageAndVersion messageAndVersion) {
ObjectSerializationCache cache = new ObjectSerializationCache();
int size = metadataRecordSerde.recordSize(messageAndVersion, cache);
ByteBufferAccessor writable = new ByteBufferAccessor(ByteBuffer.allocate(size));
metadataRecordSerde.write(messageAndVersion, cache, writable);
return writable.buffer().array();
}
public ApiMessageAndVersion deserialize(byte[] data) {
Readable readable = new ByteBufferAccessor(ByteBuffer.wrap(data));
return metadataRecordSerde.read(readable, data.length);
}
/**
* Return {@code ApiMessage} instance for the given {@code apiKey}. This is used while deserializing the bytes
* payload into the respective {@code ApiMessage} in {@link #deserialize(byte[])} method.
*
* @param apiKey apiKey for which a {@code ApiMessage} to be created.
*/
public abstract ApiMessage apiMessageFor(short apiKey);
}

104
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java

@ -0,0 +1,104 @@ @@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage.serialization;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import java.util.HashMap;
import java.util.Map;
/**
* This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde
* for the messages that are stored in internal remote log metadata topic.
*/
public class RemoteLogMetadataSerde {
private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey();
private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey();
private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey();
private final Map<String, Short> remoteLogStorageClassToApiKey;
private final Map<Short, RemoteLogMetadataTransform> keyToTransform;
private final BytesApiMessageSerde bytesApiMessageSerde;
public RemoteLogMetadataSerde() {
remoteLogStorageClassToApiKey = createRemoteLogStorageClassToApiKeyMap();
keyToTransform = createRemoteLogMetadataTransforms();
bytesApiMessageSerde = new BytesApiMessageSerde() {
@Override
public ApiMessage apiMessageFor(short apiKey) {
return newApiMessage(apiKey);
}
};
}
protected ApiMessage newApiMessage(short apiKey) {
return MetadataRecordType.fromId(apiKey).newMetadataRecord();
}
protected Map<Short, RemoteLogMetadataTransform> createRemoteLogMetadataTransforms() {
Map<Short, RemoteLogMetadataTransform> map = new HashMap<>();
map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform());
map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform());
map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform());
return map;
}
protected Map<String, Short> createRemoteLogStorageClassToApiKeyMap() {
Map<String, Short> map = new HashMap<>();
map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY);
map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY);
return map;
}
public byte[] serialize(RemoteLogMetadata remoteLogMetadata) {
Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName());
if (apiKey == null) {
throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass()
+ " does not exist.");
}
@SuppressWarnings("unchecked")
ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata);
return bytesApiMessageSerde.serialize(apiMessageAndVersion);
}
public RemoteLogMetadata deserialize(byte[] data) {
ApiMessageAndVersion apiMessageAndVersion = bytesApiMessageSerde.deserialize(data);
return remoteLogMetadataTransform(apiMessageAndVersion.message().apiKey()).fromApiMessageAndVersion(apiMessageAndVersion);
}
private RemoteLogMetadataTransform remoteLogMetadataTransform(short apiKey) {
RemoteLogMetadataTransform metadataTransform = keyToTransform.get(apiKey);
if (metadataTransform == null) {
throw new IllegalArgumentException("RemoteLogMetadataTransform for apikey: " + apiKey + " does not exist.");
}
return metadataTransform;
}
}

52
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataTransform.java

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage.serialization;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
/**
* This interface is about transforming {@link RemoteLogMetadata} objects into the respective {@link ApiMessageAndVersion} or vice versa.
* <p></p>
* Those metadata objects can be {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata},
* {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate}, or {@link org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata}.
* <p>
* @param <T> metadata type.
*
* @see RemoteLogSegmentMetadataTransform
* @see RemoteLogSegmentMetadataUpdateTransform
* @see RemotePartitionDeleteMetadataTransform
*/
public interface RemoteLogMetadataTransform<T extends RemoteLogMetadata> {
/**
* Transforms the given {@code metadata} object into the respective {@code ApiMessageAndVersion} object.
*
* @param metadata metadata object to be transformed.
* @return transformed {@code ApiMessageAndVersion} object.
*/
ApiMessageAndVersion toApiMessageAndVersion(T metadata);
/**
* Return the metadata object transformed from the given {@code apiMessageAndVersion}.
*
* @param apiMessageAndVersion ApiMessageAndVersion object to be transformed.
* @return transformed {@code T} metadata object.
*/
T fromApiMessageAndVersion(ApiMessageAndVersion apiMessageAndVersion);
}

98
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java

@ -0,0 +1,98 @@ @@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage.serialization;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadata> {
public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadata segmentMetadata) {
RemoteLogSegmentMetadataRecord record = new RemoteLogSegmentMetadataRecord()
.setRemoteLogSegmentId(createRemoteLogSegmentIdEntry(segmentMetadata))
.setStartOffset(segmentMetadata.startOffset())
.setEndOffset(segmentMetadata.endOffset())
.setBrokerId(segmentMetadata.brokerId())
.setEventTimestampMs(segmentMetadata.eventTimestampMs())
.setMaxTimestampMs(segmentMetadata.maxTimestampMs())
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
.setRemoteLogSegmentState(segmentMetadata.state().id());
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
private List<RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry> createSegmentLeaderEpochsEntry(RemoteLogSegmentMetadata data) {
return data.segmentLeaderEpochs().entrySet().stream()
.map(entry -> new RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry()
.setLeaderEpoch(entry.getKey())
.setOffset(entry.getValue()))
.collect(Collectors.toList());
}
private RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadata data) {
return new RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry()
.setTopicIdPartition(
new RemoteLogSegmentMetadataRecord.TopicIdPartitionEntry()
.setId(data.remoteLogSegmentId().topicIdPartition().topicId())
.setName(data.remoteLogSegmentId().topicIdPartition().topicPartition().topic())
.setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition().partition()))
.setId(data.remoteLogSegmentId().id());
}
@Override
public RemoteLogSegmentMetadata fromApiMessageAndVersion(ApiMessageAndVersion apiMessageAndVersion) {
RemoteLogSegmentMetadataRecord record = (RemoteLogSegmentMetadataRecord) apiMessageAndVersion.message();
RemoteLogSegmentId remoteLogSegmentId = buildRemoteLogSegmentId(record.remoteLogSegmentId());
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
for (RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry segmentLeaderEpoch : record.segmentLeaderEpochs()) {
segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset());
}
RemoteLogSegmentMetadata remoteLogSegmentMetadata =
new RemoteLogSegmentMetadata(remoteLogSegmentId, record.startOffset(), record.endOffset(),
record.maxTimestampMs(), record.brokerId(),
record.eventTimestampMs(), record.segmentSizeInBytes(),
segmentLeaderEpochs);
RemoteLogSegmentMetadataUpdate rlsmUpdate
= new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, record.eventTimestampMs(),
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
record.brokerId());
return remoteLogSegmentMetadata.createWithUpdates(rlsmUpdate);
}
private RemoteLogSegmentId buildRemoteLogSegmentId(RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry entry) {
TopicIdPartition topicIdPartition =
new TopicIdPartition(entry.topicIdPartition().id(),
new TopicPartition(entry.topicIdPartition().name(), entry.topicIdPartition().partition()));
return new RemoteLogSegmentId(topicIdPartition, entry.id());
}
}

59
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java

@ -0,0 +1,59 @@ @@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage.serialization;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
public class RemoteLogSegmentMetadataUpdateTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataUpdate> {
public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) {
RemoteLogSegmentMetadataUpdateRecord record = new RemoteLogSegmentMetadataUpdateRecord()
.setRemoteLogSegmentId(createRemoteLogSegmentIdEntry(segmentMetadataUpdate))
.setBrokerId(segmentMetadataUpdate.brokerId())
.setEventTimestampMs(segmentMetadataUpdate.eventTimestampMs())
.setRemoteLogSegmentState(segmentMetadataUpdate.state().id());
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
public RemoteLogSegmentMetadataUpdate fromApiMessageAndVersion(ApiMessageAndVersion apiMessageAndVersion) {
RemoteLogSegmentMetadataUpdateRecord record = (RemoteLogSegmentMetadataUpdateRecord) apiMessageAndVersion.message();
RemoteLogSegmentMetadataUpdateRecord.RemoteLogSegmentIdEntry entry = record.remoteLogSegmentId();
TopicIdPartition topicIdPartition = new TopicIdPartition(entry.topicIdPartition().id(),
new TopicPartition(entry.topicIdPartition().name(), entry.topicIdPartition().partition()));
return new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(topicIdPartition, entry.id()),
record.eventTimestampMs(), RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
}
private RemoteLogSegmentMetadataUpdateRecord.RemoteLogSegmentIdEntry createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadataUpdate data) {
return new RemoteLogSegmentMetadataUpdateRecord.RemoteLogSegmentIdEntry()
.setId(data.remoteLogSegmentId().id())
.setTopicIdPartition(
new RemoteLogSegmentMetadataUpdateRecord.TopicIdPartitionEntry()
.setName(data.remoteLogSegmentId().topicIdPartition().topicPartition().topic())
.setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition().partition())
.setId(data.remoteLogSegmentId().topicIdPartition().topicId()));
}
}

54
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage.serialization;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
public final class RemotePartitionDeleteMetadataTransform implements RemoteLogMetadataTransform<RemotePartitionDeleteMetadata> {
@Override
public ApiMessageAndVersion toApiMessageAndVersion(RemotePartitionDeleteMetadata partitionDeleteMetadata) {
RemotePartitionDeleteMetadataRecord record = new RemotePartitionDeleteMetadataRecord()
.setTopicIdPartition(createTopicIdPartitionEntry(partitionDeleteMetadata.topicIdPartition()))
.setEventTimestampMs(partitionDeleteMetadata.eventTimestampMs())
.setBrokerId(partitionDeleteMetadata.brokerId())
.setRemotePartitionDeleteState(partitionDeleteMetadata.state().id());
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
private RemotePartitionDeleteMetadataRecord.TopicIdPartitionEntry createTopicIdPartitionEntry(TopicIdPartition topicIdPartition) {
return new RemotePartitionDeleteMetadataRecord.TopicIdPartitionEntry()
.setName(topicIdPartition.topicPartition().topic())
.setPartition(topicIdPartition.topicPartition().partition())
.setId(topicIdPartition.topicId());
}
public RemotePartitionDeleteMetadata fromApiMessageAndVersion(ApiMessageAndVersion apiMessageAndVersion) {
RemotePartitionDeleteMetadataRecord record = (RemotePartitionDeleteMetadataRecord) apiMessageAndVersion.message();
TopicIdPartition topicIdPartition = new TopicIdPartition(record.topicIdPartition().id(),
new TopicPartition(record.topicIdPartition().name(), record.topicIdPartition().partition()));
return new RemotePartitionDeleteMetadata(topicIdPartition,
RemotePartitionDeleteState.forId(record.remotePartitionDeleteState()),
record.eventTimestampMs(), record.brokerId());
}
}

126
storage/src/main/resources/message/RemoteLogSegmentMetadata.json

@ -0,0 +1,126 @@ @@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 0,
"type": "metadata",
"name": "RemoteLogSegmentMetadataRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "RemoteLogSegmentId",
"type": "RemoteLogSegmentIdEntry",
"versions": "0+",
"about": "Unique representation of the remote log segment.",
"fields": [
{
"name": "TopicIdPartition",
"type": "TopicIdPartitionEntry",
"versions": "0+",
"about": "Represents unique topic partition.",
"fields": [
{
"name": "Name",
"type": "string",
"versions": "0+",
"about": "Topic name."
},
{
"name": "Id",
"type": "uuid",
"versions": "0+",
"about": "Unique identifier of the topic."
},
{
"name": "Partition",
"type": "int32",
"versions": "0+",
"about": "Partition number."
}
]
},
{
"name": "Id",
"type": "uuid",
"versions": "0+",
"about": "Unique identifier of the remote log segment."
}
]
},
{
"name": "StartOffset",
"type": "int64",
"versions": "0+",
"about": "Start offset of the segment."
},
{
"name": "EndOffset",
"type": "int64",
"versions": "0+",
"about": "End offset of the segment."
},
{
"name": "BrokerId",
"type": "int32",
"versions": "0+",
"about": "Broker id from which this event is generated."
},
{
"name": "MaxTimestampMs",
"type": "int64",
"versions": "0+",
"about": "Maximum timestamp in milli seconds with in this segment."
},
{
"name": "EventTimestampMs",
"type": "int64",
"versions": "0+",
"about": "Epoch time in milli seconds at which this event is generated."
},
{
"name": "SegmentLeaderEpochs",
"type": "[]SegmentLeaderEpochEntry",
"versions": "0+",
"about": "Leader epoch to start-offset mappings for the records with in this segment.",
"fields": [
{
"name": "LeaderEpoch",
"type": "int32",
"versions": "0+",
"about": "Leader epoch"
},
{
"name": "Offset",
"type": "int64",
"versions": "0+",
"about": "Start offset for the leader epoch."
}
]
},
{
"name": "SegmentSizeInBytes",
"type": "int32",
"versions": "0+",
"about": "Segment size in bytes."
},
{
"name": "RemoteLogSegmentState",
"type": "int8",
"versions": "0+",
"about": "State identifier of the remote log segment, which is RemoteLogSegmentState.id()."
}
]
}

82
storage/src/main/resources/message/RemoteLogSegmentMetadataUpdate.json

@ -0,0 +1,82 @@ @@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 1,
"type": "metadata",
"name": "RemoteLogSegmentMetadataUpdateRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "RemoteLogSegmentId",
"type": "RemoteLogSegmentIdEntry",
"versions": "0+",
"about": "Unique representation of the remote log segment.",
"fields": [
{
"name": "TopicIdPartition",
"type": "TopicIdPartitionEntry",
"versions": "0+",
"about": "Represents unique topic partition.",
"fields": [
{
"name": "Name",
"type": "string",
"versions": "0+",
"about": "Topic name."
},
{
"name": "Id",
"type": "uuid",
"versions": "0+",
"about": "Unique identifier of the topic."
},
{
"name": "Partition",
"type": "int32",
"versions": "0+",
"about": "Partition number."
}
]
},
{
"name": "Id",
"type": "uuid",
"versions": "0+",
"about": "Unique identifier of the remote log segment."
}
]
},
{
"name": "BrokerId",
"type": "int32",
"versions": "0+",
"about": "Broker id from which this event is generated."
},
{
"name": "EventTimestampMs",
"type": "int64",
"versions": "0+",
"about": "Epoch time in milli seconds at which this event is generated."
},
{
"name": "RemoteLogSegmentState",
"type": "int8",
"versions": "0+",
"about": "State identifier of the remote log segment, which is RemoteLogSegmentState.id()."
}
]
}

68
storage/src/main/resources/message/RemotePartitionDeleteMetadata.json

@ -0,0 +1,68 @@ @@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 2,
"type": "metadata",
"name": "RemotePartitionDeleteMetadataRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "TopicIdPartition",
"type": "TopicIdPartitionEntry",
"versions": "0+",
"about": "Represents unique topic partition.",
"fields": [
{
"name": "Name",
"type": "string",
"versions": "0+",
"about": "Topic name."
},
{
"name": "Id",
"type": "uuid",
"versions": "0+",
"about": "Unique identifier of the topic."
},
{
"name": "Partition",
"type": "int32",
"versions": "0+",
"about": "Partition number."
}
]
},
{
"name": "BrokerId",
"type": "int32",
"versions": "0+",
"about": "Broker (controller or leader) id from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."
},
{
"name": "EventTimestampMs",
"type": "int64",
"versions": "0+",
"about": "Epoch time in milli seconds at which this event is generated."
},
{
"name": "RemotePartitionDeleteState",
"type": "int8",
"versions": "0+",
"about": "Deletion state identifier of the remote partition, which is RemotePartitionDeleteState.id()."
}
]
}

111
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java

@ -0,0 +1,111 @@ @@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
public class RemoteLogMetadataSerdeTest {
public static final String TOPIC = "foo";
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
private final Time time = new MockTime(1);
@Test
public void testRemoteLogSegmentMetadataSerde() {
RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata();
doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata);
}
@Test
public void testRemoteLogSegmentMetadataUpdateSerde() {
RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate();
doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate);
}
@Test
public void testRemotePartitionDeleteMetadataSerde() {
RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata();
doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata);
}
private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
Map<Integer, Long> segLeaderEpochs = new HashMap<>();
segLeaderEpochs.put(0, 0L);
segLeaderEpochs.put(1, 20L);
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024, segLeaderEpochs);
}
private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
}
private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() {
return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED,
time.milliseconds(), 0);
}
private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) {
// Serialize metadata and get the bytes.
RemoteLogMetadataSerde serializer = new RemoteLogMetadataSerde();
byte[] metadataBytes = serializer.serialize(remoteLogMetadata);
// Deserialize the bytes and check the RemoteLogMetadata object is as expected.
// Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and deserializer having their own instances.
RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde();
RemoteLogMetadata deserializedRemoteLogMetadata = deserializer.deserialize(metadataBytes);
Assertions.assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata);
}
@Test
public void testInvalidRemoteStorageMetadata() {
// Serializing receives an exception as it does not have the expected RemoteLogMetadata registered in serdes.
Assertions.assertThrows(IllegalArgumentException.class,
() -> new RemoteLogMetadataSerde().serialize(new InvalidRemoteLogMetadata(1, time.milliseconds())));
}
private static class InvalidRemoteLogMetadata extends RemoteLogMetadata {
public InvalidRemoteLogMetadata(int brokerId, long eventTimestampMs) {
super(brokerId, eventTimestampMs);
}
}
}

85
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java

@ -0,0 +1,85 @@ @@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataUpdateTransform;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemotePartitionDeleteMetadataTransform;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
private final Time time = new MockTime(1);
@Test
public void testRemoteLogSegmentMetadataTransform() {
RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform();
RemoteLogSegmentMetadata metadata = createRemoteLogSegmentMetadata();
ApiMessageAndVersion apiMessageAndVersion = metadataTransform.toApiMessageAndVersion(metadata);
RemoteLogSegmentMetadata remoteLogSegmentMetadataFromRecord = metadataTransform
.fromApiMessageAndVersion(apiMessageAndVersion);
Assertions.assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
}
@Test
public void testRemoteLogSegmentMetadataUpdateTransform() {
RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform();
RemoteLogSegmentMetadataUpdate metadataUpdate =
new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
Assertions.assertEquals(metadataUpdate, metadataUpdateFromRecord);
}
private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
}
@Test
public void testRemoteLogPartitionMetadataTransform() {
RemotePartitionDeleteMetadataTransform transform = new RemotePartitionDeleteMetadataTransform();
RemotePartitionDeleteMetadata partitionDeleteMetadata
= new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_STARTED, time.milliseconds(), 1);
ApiMessageAndVersion apiMessageAndVersion = transform.toApiMessageAndVersion(partitionDeleteMetadata);
RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = transform.fromApiMessageAndVersion(apiMessageAndVersion);
Assertions.assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord);
}
}
Loading…
Cancel
Save