Browse Source

KAFKA-14183; Cluster metadata bootstrap file should use header/footer (#12565)

The boostrap.checkpoint files should include a control record batch for
the SnapshotHeaderRecord at the start of the file. It should also
include a control record batch for the SnapshotFooterRecord at the end
of the file.

The snapshot header record is important because it versions the rest of
the bootstrap file.

Reviewers: David Arthur <mumrah@gmail.com>
pull/10500/merge
José Armando García Sancio 2 years ago committed by GitHub
parent
commit
f83c6f2da4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
  2. 21
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
  3. 43
      clients/src/test/java/org/apache/kafka/common/record/ControlRecordUtilsTest.java
  4. 18
      metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
  5. 59
      metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
  6. 108
      metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
  7. 2
      raft/src/main/java/org/apache/kafka/raft/LeaderState.java
  8. 27
      raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
  9. 8
      raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
  10. 18
      raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java

13
clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java

@ -27,10 +27,9 @@ import java.nio.ByteBuffer; @@ -27,10 +27,9 @@ import java.nio.ByteBuffer;
* Utility class for easy interaction with control records.
*/
public class ControlRecordUtils {
public static final short LEADER_CHANGE_SCHEMA_HIGHEST_VERSION = LeaderChangeMessage.HIGHEST_SUPPORTED_VERSION;
public static final short SNAPSHOT_HEADER_HIGHEST_VERSION = SnapshotHeaderRecord.HIGHEST_SUPPORTED_VERSION;
public static final short SNAPSHOT_FOOTER_HIGHEST_VERSION = SnapshotFooterRecord.HIGHEST_SUPPORTED_VERSION;
public static final short LEADER_CHANGE_CURRENT_VERSION = 0;
public static final short SNAPSHOT_HEADER_CURRENT_VERSION = 0;
public static final short SNAPSHOT_FOOTER_CURRENT_VERSION = 0;
public static LeaderChangeMessage deserializeLeaderChangeMessage(Record record) {
ControlRecordType recordType = ControlRecordType.parse(record.key());
@ -43,7 +42,7 @@ public class ControlRecordUtils { @@ -43,7 +42,7 @@ public class ControlRecordUtils {
public static LeaderChangeMessage deserializeLeaderChangeMessage(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
return new LeaderChangeMessage(byteBufferAccessor, LEADER_CHANGE_SCHEMA_HIGHEST_VERSION);
return new LeaderChangeMessage(byteBufferAccessor, LEADER_CHANGE_CURRENT_VERSION);
}
public static SnapshotHeaderRecord deserializedSnapshotHeaderRecord(Record record) {
@ -57,7 +56,7 @@ public class ControlRecordUtils { @@ -57,7 +56,7 @@ public class ControlRecordUtils {
public static SnapshotHeaderRecord deserializedSnapshotHeaderRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
return new SnapshotHeaderRecord(byteBufferAccessor, SNAPSHOT_HEADER_HIGHEST_VERSION);
return new SnapshotHeaderRecord(byteBufferAccessor, SNAPSHOT_HEADER_CURRENT_VERSION);
}
public static SnapshotFooterRecord deserializedSnapshotFooterRecord(Record record) {
@ -71,6 +70,6 @@ public class ControlRecordUtils { @@ -71,6 +70,6 @@ public class ControlRecordUtils {
public static SnapshotFooterRecord deserializedSnapshotFooterRecord(ByteBuffer data) {
ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
return new SnapshotFooterRecord(byteBufferAccessor, SNAPSHOT_FOOTER_HIGHEST_VERSION);
return new SnapshotFooterRecord(byteBufferAccessor, SNAPSHOT_FOOTER_CURRENT_VERSION);
}
}

21
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

@ -609,18 +609,27 @@ public class MemoryRecordsBuilder implements AutoCloseable { @@ -609,18 +609,27 @@ public class MemoryRecordsBuilder implements AutoCloseable {
if (partitionLeaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) {
throw new IllegalArgumentException("Partition leader epoch must be valid, but get " + partitionLeaderEpoch);
}
appendControlRecord(timestamp, ControlRecordType.LEADER_CHANGE,
MessageUtil.toByteBuffer(leaderChangeMessage, ControlRecordUtils.LEADER_CHANGE_SCHEMA_HIGHEST_VERSION));
appendControlRecord(
timestamp,
ControlRecordType.LEADER_CHANGE,
MessageUtil.toByteBuffer(leaderChangeMessage, ControlRecordUtils.LEADER_CHANGE_CURRENT_VERSION)
);
}
public void appendSnapshotHeaderMessage(long timestamp, SnapshotHeaderRecord snapshotHeaderRecord) {
appendControlRecord(timestamp, ControlRecordType.SNAPSHOT_HEADER,
MessageUtil.toByteBuffer(snapshotHeaderRecord, ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION));
appendControlRecord(
timestamp,
ControlRecordType.SNAPSHOT_HEADER,
MessageUtil.toByteBuffer(snapshotHeaderRecord, ControlRecordUtils.SNAPSHOT_HEADER_CURRENT_VERSION)
);
}
public void appendSnapshotFooterMessage(long timestamp, SnapshotFooterRecord snapshotHeaderRecord) {
appendControlRecord(timestamp, ControlRecordType.SNAPSHOT_FOOTER,
MessageUtil.toByteBuffer(snapshotHeaderRecord, ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION));
appendControlRecord(
timestamp,
ControlRecordType.SNAPSHOT_FOOTER,
MessageUtil.toByteBuffer(snapshotHeaderRecord, ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION)
);
}
/**

43
clients/src/test/java/org/apache/kafka/common/record/ControlRecordUtilsTest.java

@ -16,26 +16,48 @@ @@ -16,26 +16,48 @@
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.message.LeaderChangeMessage;
import java.nio.ByteBuffer;
import java.util.Collections;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ControlRecordUtilsTest {
@Test
public void testCurrentVersions() {
// If any of these asserts fail, please make sure that Kafka supports reading and
// writing the latest version for these records.
assertEquals(
LeaderChangeMessage.HIGHEST_SUPPORTED_VERSION,
ControlRecordUtils.LEADER_CHANGE_CURRENT_VERSION
);
assertEquals(
SnapshotHeaderRecord.HIGHEST_SUPPORTED_VERSION,
ControlRecordUtils.SNAPSHOT_HEADER_CURRENT_VERSION
);
assertEquals(
SnapshotFooterRecord.HIGHEST_SUPPORTED_VERSION,
ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION
);
}
@Test
public void testInvalidControlRecordType() {
IllegalArgumentException thrown = assertThrows(
IllegalArgumentException.class, () -> testDeserializeRecord(ControlRecordType.COMMIT));
assertEquals("Expected LEADER_CHANGE control record type(2), but found COMMIT", thrown.getMessage());
IllegalArgumentException.class,
() -> testDeserializeRecord(ControlRecordType.COMMIT)
);
assertEquals(
"Expected LEADER_CHANGE control record type(2), but found COMMIT",
thrown.getMessage()
);
}
@Test
@ -47,9 +69,8 @@ public class ControlRecordUtilsTest { @@ -47,9 +69,8 @@ public class ControlRecordUtilsTest {
final int leaderId = 1;
final int voterId = 2;
LeaderChangeMessage data = new LeaderChangeMessage()
.setLeaderId(leaderId)
.setVoters(Collections.singletonList(
new Voter().setVoterId(voterId)));
.setLeaderId(leaderId)
.setVoters(Collections.singletonList(new Voter().setVoterId(voterId)));
ByteBuffer valueBuffer = ByteBuffer.allocate(256);
data.write(new ByteBufferAccessor(valueBuffer), new ObjectSerializationCache(), data.highestSupportedVersion());

18
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java

@ -113,12 +113,20 @@ public class BootstrapDirectory { @@ -113,12 +113,20 @@ public class BootstrapDirectory {
}
Path tempPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME + ".tmp");
Files.deleteIfExists(tempPath);
try (BatchFileWriter writer = BatchFileWriter.open(tempPath)) {
for (ApiMessageAndVersion message : bootstrapMetadata.records()) {
writer.append(message);
try {
try (BatchFileWriter writer = BatchFileWriter.open(tempPath)) {
for (ApiMessageAndVersion message : bootstrapMetadata.records()) {
writer.append(message);
}
}
Files.move(
tempPath,
Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME),
ATOMIC_MOVE, REPLACE_EXISTING
);
} finally {
Files.deleteIfExists(tempPath);
}
Files.move(tempPath, Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME),
ATOMIC_MOVE, REPLACE_EXISTING);
}
}

59
metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java

@ -17,7 +17,10 @@ @@ -17,7 +17,10 @@
package org.apache.kafka.metadata.util;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.MetadataRecordSerde;
@ -36,16 +39,25 @@ import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES; @@ -36,16 +39,25 @@ import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
/**
* Write an arbitrary set of metadata records into a Kafka metadata log batch format. This is similar to the binary
* format used for metadata snapshot files, but the log epoch and initial offset are set to zero.
* Write an arbitrary set of metadata records into a Kafka metadata log batch format.
*
* This is similar to the binary format used for metadata snapshot files, but the log epoch
* and initial offset are set to zero. This type includes a SnapshotHeaderRecord record in the
* first batch and a SnapshotFooterRecord record in the last batch.
*/
public class BatchFileWriter implements AutoCloseable {
private final FileChannel channel;
private final BatchAccumulator<ApiMessageAndVersion> batchAccumulator;
private final Time time;
BatchFileWriter(FileChannel channel, BatchAccumulator<ApiMessageAndVersion> batchAccumulator) {
private BatchFileWriter(
FileChannel channel,
BatchAccumulator<ApiMessageAndVersion> batchAccumulator,
Time time
) {
this.channel = channel;
this.batchAccumulator = batchAccumulator;
this.time = time;
}
public void append(ApiMessageAndVersion apiMessageAndVersion) {
@ -57,25 +69,52 @@ public class BatchFileWriter implements AutoCloseable { @@ -57,25 +69,52 @@ public class BatchFileWriter implements AutoCloseable {
}
public void close() throws IOException {
for (BatchAccumulator.CompletedBatch<ApiMessageAndVersion> batch : batchAccumulator.drain()) {
Utils.writeFully(channel, batch.data.buffer());
// Append the footer before draining the batch accumulator and force it to create a batch
batchAccumulator.appendSnapshotFooterRecord(
new SnapshotFooterRecord()
.setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION),
time.milliseconds()
);
batchAccumulator.forceDrain();
try {
for (BatchAccumulator.CompletedBatch<ApiMessageAndVersion> batch : batchAccumulator.drain()) {
Utils.writeFully(channel, batch.data.buffer());
}
} finally {
batchAccumulator.close();
channel.close();
}
channel.close();
}
public static BatchFileWriter open(Path snapshotPath) throws IOException {
Time time = Time.SYSTEM;
BatchAccumulator<ApiMessageAndVersion> batchAccumulator = new BatchAccumulator<>(
0,
0,
Integer.MAX_VALUE,
MAX_BATCH_SIZE_BYTES,
new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
Time.SYSTEM,
time,
CompressionType.NONE,
new MetadataRecordSerde());
new MetadataRecordSerde()
);
// Append the snapshot header control record and force it to create a batch
batchAccumulator.appendSnapshotHeaderRecord(
new SnapshotHeaderRecord()
.setVersion(ControlRecordUtils.SNAPSHOT_HEADER_CURRENT_VERSION)
.setLastContainedLogTimestamp(0),
time.milliseconds()
);
batchAccumulator.forceDrain();
FileChannel channel = FileChannel.open(snapshotPath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
FileChannel channel = FileChannel.open(
snapshotPath,
StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE
);
return new BatchFileWriter(channel, batchAccumulator);
return new BatchFileWriter(channel, batchAccumulator, time);
}
}

108
metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java

@ -0,0 +1,108 @@ @@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.util;
import java.io.File;
import java.nio.file.Path;
import java.util.List;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
final public class BatchFileWriterReaderTest {
@Test
public void testHeaderFooter() throws Exception {
File tempFile = TestUtils.tempFile();
Path tempPath = tempFile.toPath();
// Delete the file because BatchFileWriter doesn't expect the file to exist
tempFile.delete();
try (BatchFileWriter writer = BatchFileWriter.open(tempPath)) {
ApiMessageAndVersion message = new ApiMessageAndVersion(
new TopicRecord()
.setName("bar")
.setTopicId(Uuid.fromString("cxBT72dK4si8Ied1iP4wBA")),
(short) 0
);
writer.append(message);
}
try (BatchFileReader reader = new BatchFileReader.Builder()
.setPath(tempPath.toString()).build()) {
// Check the SnapshotHeaderRecord
long currentOffset = 0;
assertTrue(reader.hasNext());
BatchAndType batchAndType = reader.next();
assertTrue(batchAndType.isControl());
Batch<ApiMessageAndVersion> batch = batchAndType.batch();
assertEquals(currentOffset, batch.baseOffset());
assertEquals(currentOffset, batch.lastOffset());
List<ApiMessageAndVersion> records = batch.records();
assertEquals(1, records.size());
ApiMessageAndVersion apiMessageAndVersion = records.get(0);
assertEquals(0, apiMessageAndVersion.version());
SnapshotHeaderRecord headerRecord = (SnapshotHeaderRecord) apiMessageAndVersion.message();
assertEquals(0, headerRecord.version());
assertEquals(0, headerRecord.lastContainedLogTimestamp());
// Check the TopicRecord
currentOffset++;
assertTrue(reader.hasNext());
batchAndType = reader.next();
assertFalse(batchAndType.isControl());
batch = batchAndType.batch();
assertEquals(currentOffset, batch.baseOffset());
assertEquals(currentOffset, batch.lastOffset());
records = batch.records();
assertEquals(1, records.size());
apiMessageAndVersion = records.get(0);
assertEquals(0, apiMessageAndVersion.version());
TopicRecord topicRecord = (TopicRecord) apiMessageAndVersion.message();
assertEquals("bar", topicRecord.name());
assertEquals(Uuid.fromString("cxBT72dK4si8Ied1iP4wBA"), topicRecord.topicId());
// Check the SnapshotFooterRecord
currentOffset++;
assertTrue(reader.hasNext());
batchAndType = reader.next();
assertTrue(batchAndType.isControl());
batch = batchAndType.batch();
assertEquals(currentOffset, batch.baseOffset());
assertEquals(currentOffset, batch.lastOffset());
records = batch.records();
assertEquals(1, records.size());
apiMessageAndVersion = records.get(0);
assertEquals(0, apiMessageAndVersion.version());
SnapshotFooterRecord footerRecord = (SnapshotFooterRecord) apiMessageAndVersion.message();
assertEquals(0, headerRecord.version());
}
}
}

2
raft/src/main/java/org/apache/kafka/raft/LeaderState.java

@ -96,7 +96,7 @@ public class LeaderState<T> implements EpochState { @@ -96,7 +96,7 @@ public class LeaderState<T> implements EpochState {
List<Voter> grantingVoters = convertToVoters(this.grantingVoters());
LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
.setVersion(ControlRecordUtils.LEADER_CHANGE_SCHEMA_HIGHEST_VERSION)
.setVersion(ControlRecordUtils.LEADER_CHANGE_CURRENT_VERSION)
.setLeaderId(this.election().leaderId())
.setVoters(voters)
.setGrantingVoters(grantingVoters);

27
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java

@ -260,18 +260,18 @@ public class BatchAccumulator<T> implements Closeable { @@ -260,18 +260,18 @@ public class BatchAccumulator<T> implements Closeable {
/**
* Append a {@link LeaderChangeMessage} record to the batch
*
* @param @LeaderChangeMessage The message to append
* @param @currentTimeMs The timestamp of message generation
* @param LeaderChangeMessage The message to append
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/
public void appendLeaderChangeMessage(
LeaderChangeMessage leaderChangeMessage,
long currentTimeMs
long currentTimestamp
) {
appendControlMessage(buffer -> {
return MemoryRecords.withLeaderChangeMessage(
this.nextOffset,
currentTimeMs,
currentTimestamp,
this.epoch,
buffer,
leaderChangeMessage
@ -283,17 +283,18 @@ public class BatchAccumulator<T> implements Closeable { @@ -283,17 +283,18 @@ public class BatchAccumulator<T> implements Closeable {
/**
* Append a {@link SnapshotHeaderRecord} record to the batch
*
* @param snapshotHeaderRecord The message to append
* @param snapshotHeaderRecord The record to append
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/
public void appendSnapshotHeaderMessage(
public void appendSnapshotHeaderRecord(
SnapshotHeaderRecord snapshotHeaderRecord,
long currentTimeMs
long currentTimestamp
) {
appendControlMessage(buffer -> {
return MemoryRecords.withSnapshotHeaderRecord(
this.nextOffset,
currentTimeMs,
currentTimestamp,
this.epoch,
buffer,
snapshotHeaderRecord
@ -304,18 +305,18 @@ public class BatchAccumulator<T> implements Closeable { @@ -304,18 +305,18 @@ public class BatchAccumulator<T> implements Closeable {
/**
* Append a {@link SnapshotFooterRecord} record to the batch
*
* @param snapshotFooterRecord The message to append
* @param currentTimeMs
* @param snapshotFooterRecord The record to append
* @param currentTimestamp The current time in milliseconds
* @throws IllegalStateException on failure to allocate a buffer for the record
*/
public void appendSnapshotFooterMessage(
public void appendSnapshotFooterRecord(
SnapshotFooterRecord snapshotFooterRecord,
long currentTimeMs
long currentTimestamp
) {
appendControlMessage(buffer -> {
return MemoryRecords.withSnapshotFooterRecord(
this.nextOffset,
currentTimeMs,
currentTimestamp,
this.epoch,
buffer,
snapshotFooterRecord

8
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java

@ -78,9 +78,9 @@ final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { @@ -78,9 +78,9 @@ final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
}
SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord()
.setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION)
.setVersion(ControlRecordUtils.SNAPSHOT_HEADER_CURRENT_VERSION)
.setLastContainedLogTimestamp(lastContainedLogTimestamp);
accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds());
accumulator.appendSnapshotHeaderRecord(headerRecord, time.milliseconds());
accumulator.forceDrain();
}
@ -91,8 +91,8 @@ final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { @@ -91,8 +91,8 @@ final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
*/
private void finalizeSnapshotWithFooter() {
SnapshotFooterRecord footerRecord = new SnapshotFooterRecord()
.setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds());
.setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION);
accumulator.appendSnapshotFooterRecord(footerRecord, time.milliseconds());
accumulator.forceDrain();
}

18
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java

@ -18,26 +18,26 @@ package org.apache.kafka.snapshot; @@ -18,26 +18,26 @@ package org.apache.kafka.snapshot;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Iterator;
import java.util.Set;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -219,7 +219,7 @@ final public class SnapshotWriterReaderTest { @@ -219,7 +219,7 @@ final public class SnapshotWriterReaderTest {
countRecords += 1;
SnapshotHeaderRecord headerRecord = ControlRecordUtils.deserializedSnapshotHeaderRecord(record);
assertEquals(headerRecord.version(), ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION);
assertEquals(headerRecord.version(), ControlRecordUtils.SNAPSHOT_HEADER_CURRENT_VERSION);
assertEquals(headerRecord.lastContainedLogTimestamp(), lastContainedLogTime);
assertFalse(records.hasNext());
@ -239,7 +239,7 @@ final public class SnapshotWriterReaderTest { @@ -239,7 +239,7 @@ final public class SnapshotWriterReaderTest {
assertTrue(batch.isControlBatch());
SnapshotFooterRecord footerRecord = ControlRecordUtils.deserializedSnapshotFooterRecord(record);
assertEquals(footerRecord.version(), ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION);
assertEquals(footerRecord.version(), ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION);
return countRecords;
}

Loading…
Cancel
Save