From 089d1b154fff1536fe7953ffe157a203b4a24455 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Fri, 5 Oct 2018 11:45:50 -0700 Subject: [PATCH] MINOR: Add topic config to PartitionsSpec (#5523) Reviewers: Bob Barrett , Ismael Juma --- .../trogdor/workload/PartitionsSpec.java | 23 ++++++++++++++++--- .../trogdor/common/JsonSerializationTest.java | 6 ++--- .../trogdor/workload/TopicsSpecTest.java | 15 ++++++++++-- 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java index a6ebb2184b0..c1dc7c68761 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java @@ -39,11 +39,13 @@ public class PartitionsSpec extends Message { private final int numPartitions; private final short replicationFactor; private final Map> partitionAssignments; + private final Map configs; @JsonCreator public PartitionsSpec(@JsonProperty("numPartitions") int numPartitions, @JsonProperty("replicationFactor") short replicationFactor, - @JsonProperty("partitionAssignments") Map> partitionAssignments) { + @JsonProperty("partitionAssignments") Map> partitionAssignments, + @JsonProperty("configs") Map configs) { this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; HashMap> partMap = new HashMap<>(); @@ -60,6 +62,11 @@ public class PartitionsSpec extends Message { } } this.partitionAssignments = Collections.unmodifiableMap(partMap); + if (configs == null) { + this.configs = Collections.emptyMap(); + } else { + this.configs = Collections.unmodifiableMap(new HashMap<>(configs)); + } } @JsonProperty @@ -90,15 +97,25 @@ public class PartitionsSpec extends Message { return partitionAssignments; } + @JsonProperty + public Map configs() { + return configs; + } + public NewTopic newTopic(String topicName) { + NewTopic newTopic; if (partitionAssignments.isEmpty()) { int effectiveNumPartitions = numPartitions <= 0 ? DEFAULT_NUM_PARTITIONS : numPartitions; short effectiveReplicationFactor = replicationFactor <= 0 ? DEFAULT_REPLICATION_FACTOR : replicationFactor; - return new NewTopic(topicName, effectiveNumPartitions, effectiveReplicationFactor); + newTopic = new NewTopic(topicName, effectiveNumPartitions, effectiveReplicationFactor); } else { - return new NewTopic(topicName, partitionAssignments); + newTopic = new NewTopic(topicName, partitionAssignments); + } + if (!configs.isEmpty()) { + newTopic.configs(configs); } + return newTopic; } } diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java index ea1eda6ee85..e7809cd0c80 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java @@ -58,12 +58,12 @@ public class JsonSerializationTest { verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null, 0, null, null, 0)); verify(new TopicsSpec()); - verify(new PartitionsSpec(0, (short) 0, null)); + verify(new PartitionsSpec(0, (short) 0, null, null)); Map> partitionAssignments = new HashMap>(); partitionAssignments.put(0, Arrays.asList(1, 2, 3)); partitionAssignments.put(1, Arrays.asList(1, 2, 3)); - verify(new PartitionsSpec(0, (short) 0, partitionAssignments)); - verify(new PartitionsSpec(0, (short) 0, null)); + verify(new PartitionsSpec(0, (short) 0, partitionAssignments, null)); + verify(new PartitionsSpec(0, (short) 0, null, null)); } private void verify(T val1) throws Exception { diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java index f86ca0f1f57..51c6dfdd9a2 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.kafka.trogdor.common.JsonUtil; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -40,13 +42,13 @@ public class TopicsSpecTest { static { FOO = new TopicsSpec(); - PARTSA = new PartitionsSpec(3, (short) 3, null); + PARTSA = new PartitionsSpec(3, (short) 3, null, null); FOO.set("topicA[0-2]", PARTSA); Map> assignmentsB = new HashMap<>(); assignmentsB.put(0, Arrays.asList(0, 1, 2)); assignmentsB.put(1, Arrays.asList(2, 3, 4)); - PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB); + PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB, null); FOO.set("topicB", PARTSB); } @@ -77,4 +79,13 @@ public class TopicsSpecTest { assertEquals(Integer.valueOf(1), partsBNumbers.get(1)); assertEquals(2, partsBNumbers.size()); } + + @Test + public void testPartitionsSpec() throws Exception { + String text = "{\"numPartitions\": 5, \"configs\": {\"foo\": \"bar\"}}"; + PartitionsSpec spec = JsonUtil.JSON_SERDE.readValue(text, PartitionsSpec.class); + assertEquals(5, spec.numPartitions()); + assertEquals("bar", spec.configs().get("foo")); + assertEquals(1, spec.configs().size()); + } }