Browse Source

MINOR: more KRaft Metadata Image tests (#13724)

Adds additional testing for the various KRaft *Image classes. For every image that we create we already test that we can get there by applying all the records corresponding to that image written out as a list of records. This patch adds more tests to confirm that we can get to each such final image with intermediate stops at all possible intermediate images along the way.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
pull/13813/head
Ron Dagostino 1 year ago committed by GitHub
parent
commit
edd64fa251
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java
  2. 42
      metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java
  3. 41
      metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
  4. 42
      metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
  5. 42
      metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
  6. 68
      metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
  7. 42
      metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java
  8. 46
      metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
  9. 58
      metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
  10. 71
      metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java

42
metadata/src/test/java/org/apache/kafka/image/AclsImageTest.java

@ -32,6 +32,7 @@ import java.util.ArrayList; @@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -73,31 +74,48 @@ public class AclsImageTest { @@ -73,31 +74,48 @@ public class AclsImageTest {
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(AclsImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(AclsImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply());
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1);
records.addAll(DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private void testToImageAndBack(AclsImage image) throws Throwable {
private static void testToImage(AclsImage image) {
testToImage(image, Optional.empty());
}
private static void testToImage(AclsImage image, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image)));
}
private static void testToImage(AclsImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> AclsImage.EMPTY,
AclsDelta::new
).test(image, fromRecords);
}
private static List<ApiMessageAndVersion> getImageRecords(AclsImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().build());
AclsDelta delta = new AclsDelta(AclsImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
AclsImage nextImage = delta.apply();
assertEquals(image, nextImage);
return writer.records();
}
}

42
metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java

@ -33,6 +33,7 @@ import java.util.Arrays; @@ -33,6 +33,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -89,31 +90,48 @@ public class ClientQuotasImageTest { @@ -89,31 +90,48 @@ public class ClientQuotasImageTest {
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(ClientQuotasImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(ClientQuotasImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply());
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1);
records.addAll(DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private void testToImageAndBack(ClientQuotasImage image) throws Throwable {
private static void testToImage(ClientQuotasImage image) {
testToImage(image, Optional.empty());
}
private static void testToImage(ClientQuotasImage image, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image)));
}
private static void testToImage(ClientQuotasImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> ClientQuotasImage.EMPTY,
ClientQuotasDelta::new
).test(image, fromRecords);
}
private static List<ApiMessageAndVersion> getImageRecords(ClientQuotasImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().build());
ClientQuotasDelta delta = new ClientQuotasDelta(ClientQuotasImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
ClientQuotasImage nextImage = delta.apply();
assertEquals(image, nextImage);
return writer.records();
}
}

41
metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java

@ -127,31 +127,48 @@ public class ClusterImageTest { @@ -127,31 +127,48 @@ public class ClusterImageTest {
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(ClusterImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(ClusterImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply());
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1);
records.addAll(DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private void testToImageAndBack(ClusterImage image) throws Throwable {
private static void testToImage(ClusterImage image) {
testToImage(image, Optional.empty());
}
private static void testToImage(ClusterImage image, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image)));
}
private static void testToImage(ClusterImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> ClusterImage.EMPTY,
ClusterDelta::new
).test(image, fromRecords);
}
private static List<ApiMessageAndVersion> getImageRecords(ClusterImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().build());
ClusterDelta delta = new ClusterDelta(ClusterImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
ClusterImage nextImage = delta.apply();
assertEquals(image, nextImage);
return writer.records();
}
}

42
metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java

@ -30,6 +30,7 @@ import java.util.ArrayList; @@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
@ -84,31 +85,48 @@ public class ConfigurationsImageTest { @@ -84,31 +85,48 @@ public class ConfigurationsImageTest {
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(ConfigurationsImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(ConfigurationsImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply());
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1);
records.addAll(DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private void testToImageAndBack(ConfigurationsImage image) throws Throwable {
private static void testToImage(ConfigurationsImage image) {
testToImage(image, Optional.empty());
}
private static void testToImage(ConfigurationsImage image, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image)));
}
private static void testToImage(ConfigurationsImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> ConfigurationsImage.EMPTY,
ConfigurationsDelta::new
).test(image, fromRecords);
}
private static List<ApiMessageAndVersion> getImageRecords(ConfigurationsImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().build());
ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
ConfigurationsImage nextImage = delta.apply();
assertEquals(image, nextImage);
return writer.records();
}
}

42
metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java

@ -32,6 +32,7 @@ import java.util.Collections; @@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -72,32 +73,49 @@ public class FeaturesImageTest { @@ -72,32 +73,49 @@ public class FeaturesImageTest {
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(FeaturesImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(FeaturesImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply());
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1);
records.addAll(DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private void testToImageAndBack(FeaturesImage image) throws Throwable {
private static void testToImage(FeaturesImage image) {
testToImage(image, Optional.empty());
}
private static void testToImage(FeaturesImage image, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image)));
}
private static void testToImage(FeaturesImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> FeaturesImage.EMPTY,
FeaturesDelta::new
).test(image, fromRecords);
}
private static List<ApiMessageAndVersion> getImageRecords(FeaturesImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().setMetadataVersion(image.metadataVersion()).build());
FeaturesDelta delta = new FeaturesDelta(FeaturesImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
FeaturesImage nextImage = delta.apply();
assertEquals(image, nextImage);
return writer.records();
}
@Test

68
metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java

@ -20,9 +20,13 @@ package org.apache.kafka.image; @@ -20,9 +20,13 @@ package org.apache.kafka.image;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -71,31 +75,69 @@ public class MetadataImageTest { @@ -71,31 +75,69 @@ public class MetadataImageTest {
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(MetadataImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(MetadataImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply(IMAGE2.provenance()));
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
ImageWriterOptions options = new ImageWriterOptions.Builder()
.setMetadataVersion(IMAGE1.features().metadataVersion())
.build();
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1, options);
records.addAll(FeaturesImageTest.DELTA1_RECORDS);
records.addAll(ClusterImageTest.DELTA1_RECORDS);
records.addAll(TopicsImageTest.DELTA1_RECORDS);
records.addAll(ConfigurationsImageTest.DELTA1_RECORDS);
records.addAll(ClientQuotasImageTest.DELTA1_RECORDS);
records.addAll(ProducerIdsImageTest.DELTA1_RECORDS);
records.addAll(AclsImageTest.DELTA1_RECORDS);
records.addAll(ScramImageTest.DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private static void testToImage(MetadataImage image) {
testToImage(image, new ImageWriterOptions.Builder()
.setMetadataVersion(image.features().metadataVersion())
.build(), Optional.empty());
}
private static void testToImage(MetadataImage image, ImageWriterOptions options) {
testToImage(image, options, Optional.empty());
}
static void testToImage(MetadataImage image, ImageWriterOptions options, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image, options)));
}
private static void testToImage(MetadataImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<MetadataDelta, MetadataImage>(
() -> MetadataImage.EMPTY,
MetadataDelta::new
) {
@Override
public MetadataImage createImageByApplyingDelta(MetadataDelta delta) {
return delta.apply(image.provenance());
}
}.test(image, fromRecords);
}
private void testToImageAndBack(MetadataImage image) throws Throwable {
private static List<ApiMessageAndVersion> getImageRecords(MetadataImage image, ImageWriterOptions options) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder(image).build());
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
MetadataImage nextImage = delta.apply(image.provenance());
assertEquals(image, nextImage);
image.write(writer, options);
return writer.records();
}
}

42
metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java

@ -27,6 +27,7 @@ import org.junit.jupiter.api.Timeout; @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -61,31 +62,48 @@ public class ProducerIdsImageTest { @@ -61,31 +62,48 @@ public class ProducerIdsImageTest {
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(ProducerIdsImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(ProducerIdsImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply());
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1);
records.addAll(DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private void testToImageAndBack(ProducerIdsImage image) throws Throwable {
private static void testToImage(ProducerIdsImage image) {
testToImage(image, Optional.empty());
}
private static void testToImage(ProducerIdsImage image, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image)));
}
private static void testToImage(ProducerIdsImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> ProducerIdsImage.EMPTY,
ProducerIdsDelta::new
).test(image, fromRecords);
}
private static List<ApiMessageAndVersion> getImageRecords(ProducerIdsImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().build());
ProducerIdsDelta delta = new ProducerIdsDelta(ProducerIdsImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
ProducerIdsImage nextImage = delta.apply();
assertEquals(image, nextImage);
return writer.records();
}
}

46
metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java

@ -34,6 +34,7 @@ import java.util.ArrayList; @@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import static org.apache.kafka.clients.admin.ScramMechanism.SCRAM_SHA_256;
@ -113,36 +114,53 @@ public class ScramImageTest { @@ -113,36 +114,53 @@ public class ScramImageTest {
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(ScramImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(ScramImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply());
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1);
records.addAll(DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private void testToImageAndBack(ScramImage image) throws Throwable {
private static void testToImage(ScramImage image) {
testToImage(image, Optional.empty());
}
private static void testToImage(ScramImage image, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image)));
}
private static void testToImage(ScramImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> ScramImage.EMPTY,
ScramDelta::new
).test(image, fromRecords);
}
private static List<ApiMessageAndVersion> getImageRecords(ScramImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().build());
ScramDelta delta = new ScramDelta(ScramImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
ScramImage nextImage = delta.apply();
assertEquals(image, nextImage);
return writer.records();
}
@Test
public void testEmptyWithInvalidIBP() throws Throwable {
public void testEmptyWithInvalidIBP() {
ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build();
RecordListWriter writer = new RecordListWriter();
@ -150,7 +168,7 @@ public class ScramImageTest { @@ -150,7 +168,7 @@ public class ScramImageTest {
}
@Test
public void testImage1withInvalidIBP() throws Throwable {
public void testImage1withInvalidIBP() {
ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_4_IV0).build();
RecordListWriter writer = new RecordListWriter();

58
metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java

@ -41,6 +41,7 @@ import java.util.HashMap; @@ -41,6 +41,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
@ -225,6 +226,11 @@ public class TopicsImageTest { @@ -225,6 +226,11 @@ public class TopicsImageTest {
),
changes.followers().keySet()
);
TopicsImage finalImage = delta.apply();
List<ApiMessageAndVersion> imageRecords = getImageRecords(IMAGE1);
imageRecords.addAll(topicRecords);
testToImage(finalImage, Optional.of(imageRecords));
}
@Test
@ -265,6 +271,11 @@ public class TopicsImageTest { @@ -265,6 +271,11 @@ public class TopicsImageTest {
assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes());
assertEquals(Collections.emptyMap(), changes.leaders());
assertEquals(Collections.emptyMap(), changes.followers());
TopicsImage finalImage = delta.apply();
List<ApiMessageAndVersion> imageRecords = getImageRecords(image);
imageRecords.addAll(topicRecords);
testToImage(finalImage, Optional.of(imageRecords));
}
@Test
@ -365,35 +376,58 @@ public class TopicsImageTest { @@ -365,35 +376,58 @@ public class TopicsImageTest {
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 1), new TopicPartition("zoo", 5))),
changes.followers().keySet()
);
TopicsImage finalImage = delta.apply();
List<ApiMessageAndVersion> imageRecords = getImageRecords(image);
imageRecords.addAll(topicRecords);
testToImage(finalImage, Optional.of(imageRecords));
}
@Test
public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(TopicsImage.EMPTY);
public void testEmptyImageRoundTrip() {
testToImage(TopicsImage.EMPTY);
}
@Test
public void testImage1RoundTrip() throws Throwable {
testToImageAndBack(IMAGE1);
public void testImage1RoundTrip() {
testToImage(IMAGE1);
}
@Test
public void testApplyDelta1() throws Throwable {
public void testApplyDelta1() {
assertEquals(IMAGE2, DELTA1.apply());
// check image1 + delta1 = image2, since records for image1 + delta1 might differ from records from image2
List<ApiMessageAndVersion> records = getImageRecords(IMAGE1);
records.addAll(DELTA1_RECORDS);
testToImage(IMAGE2, records);
}
@Test
public void testImage2RoundTrip() throws Throwable {
testToImageAndBack(IMAGE2);
public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
private static void testToImage(TopicsImage image) {
testToImage(image, Optional.empty());
}
private static void testToImage(TopicsImage image, Optional<List<ApiMessageAndVersion>> fromRecords) {
testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image)));
}
private static void testToImage(TopicsImage image, List<ApiMessageAndVersion> fromRecords) {
// test from empty image stopping each of the various intermediate images along the way
new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<>(
() -> TopicsImage.EMPTY,
TopicsDelta::new
).test(image, fromRecords);
}
private void testToImageAndBack(TopicsImage image) throws Throwable {
private static List<ApiMessageAndVersion> getImageRecords(TopicsImage image) {
RecordListWriter writer = new RecordListWriter();
image.write(writer, new ImageWriterOptions.Builder().build());
TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
RecordTestUtils.replayAll(delta, writer.records());
TopicsImage nextImage = delta.apply();
assertEquals(image, nextImage);
return writer.records();
}
@Test

71
metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java

@ -38,8 +38,11 @@ import java.util.Comparator; @@ -38,8 +38,11 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -95,6 +98,74 @@ public class RecordTestUtils { @@ -95,6 +98,74 @@ public class RecordTestUtils {
replayAll(target, Collections.singletonList(recordAndVersion));
}
public static class TestThroughAllIntermediateImagesLeadingToFinalImageHelper<D, I> {
private final Supplier<I> emptyImageSupplier;
private final Function<I, D> deltaUponImageCreator;
public TestThroughAllIntermediateImagesLeadingToFinalImageHelper(
Supplier<I> emptyImageSupplier, Function<I, D> deltaUponImageCreator
) {
this.emptyImageSupplier = Objects.requireNonNull(emptyImageSupplier);
this.deltaUponImageCreator = Objects.requireNonNull(deltaUponImageCreator);
}
public I getEmptyImage() {
return this.emptyImageSupplier.get();
}
public D createDeltaUponImage(I image) {
return this.deltaUponImageCreator.apply(image);
}
@SuppressWarnings("unchecked")
public I createImageByApplyingDelta(D delta) {
try {
try {
Method method = delta.getClass().getMethod("apply");
return (I) method.invoke(delta);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
} catch (InvocationTargetException e) {
throw new RuntimeException(e.getCause());
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
public void test(I finalImage, List<ApiMessageAndVersion> fromRecords) {
for (int numRecordsForfirstImage = 1; numRecordsForfirstImage <= fromRecords.size(); ++numRecordsForfirstImage) {
// create first image from first numRecordsForfirstImage records
D delta = createDeltaUponImage(getEmptyImage());
RecordTestUtils.replayAll(delta, fromRecords.subList(0, numRecordsForfirstImage));
I firstImage = createImageByApplyingDelta(delta);
// for all possible further batch sizes, apply as many batches as it takes to get to the final image
int remainingRecords = fromRecords.size() - numRecordsForfirstImage;
if (remainingRecords == 0) {
assertEquals(finalImage, firstImage);
} else {
// for all possible further batch sizes...
for (int maxRecordsForSuccessiveBatches = 1; maxRecordsForSuccessiveBatches <= remainingRecords; ++maxRecordsForSuccessiveBatches) {
I latestIntermediateImage = firstImage;
// ... apply as many batches as it takes to get to the final image
int numAdditionalBatches = (int) Math.ceil(remainingRecords * 1.0 / maxRecordsForSuccessiveBatches);
for (int additionalBatchNum = 0; additionalBatchNum < numAdditionalBatches; ++additionalBatchNum) {
// apply up to maxRecordsForSuccessiveBatches records on top of the latest intermediate image
// to obtain the next intermediate image.
delta = createDeltaUponImage(latestIntermediateImage);
int applyFromIndex = numRecordsForfirstImage + additionalBatchNum * maxRecordsForSuccessiveBatches;
int applyToIndex = Math.min(fromRecords.size(), applyFromIndex + maxRecordsForSuccessiveBatches);
RecordTestUtils.replayAll(delta, fromRecords.subList(applyFromIndex, applyToIndex));
latestIntermediateImage = createImageByApplyingDelta(delta);
}
// The final intermediate image received should be the expected final image
assertEquals(finalImage, latestIntermediateImage);
}
}
}
}
}
/**
* Replay a list of record batches.
*

Loading…
Cancel
Save