Browse Source

MINOR: Add topic config to PartitionsSpec (#5523)

Reviewers: Bob Barrett <bob.barrett@outlook.com>, Ismael Juma <ismael@juma.me.uk>
pull/5752/head
Colin Patrick McCabe 6 years ago committed by Ismael Juma
parent
commit
089d1b154f
  1. 23
      tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
  2. 6
      tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
  3. 15
      tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java

23
tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java

@ -39,11 +39,13 @@ public class PartitionsSpec extends Message { @@ -39,11 +39,13 @@ public class PartitionsSpec extends Message {
private final int numPartitions;
private final short replicationFactor;
private final Map<Integer, List<Integer>> partitionAssignments;
private final Map<String, String> configs;
@JsonCreator
public PartitionsSpec(@JsonProperty("numPartitions") int numPartitions,
@JsonProperty("replicationFactor") short replicationFactor,
@JsonProperty("partitionAssignments") Map<Integer, List<Integer>> partitionAssignments) {
@JsonProperty("partitionAssignments") Map<Integer, List<Integer>> partitionAssignments,
@JsonProperty("configs") Map<String, String> configs) {
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
HashMap<Integer, List<Integer>> partMap = new HashMap<>();
@ -60,6 +62,11 @@ public class PartitionsSpec extends Message { @@ -60,6 +62,11 @@ public class PartitionsSpec extends Message {
}
}
this.partitionAssignments = Collections.unmodifiableMap(partMap);
if (configs == null) {
this.configs = Collections.emptyMap();
} else {
this.configs = Collections.unmodifiableMap(new HashMap<>(configs));
}
}
@JsonProperty
@ -90,15 +97,25 @@ public class PartitionsSpec extends Message { @@ -90,15 +97,25 @@ public class PartitionsSpec extends Message {
return partitionAssignments;
}
@JsonProperty
public Map<String, String> configs() {
return configs;
}
public NewTopic newTopic(String topicName) {
NewTopic newTopic;
if (partitionAssignments.isEmpty()) {
int effectiveNumPartitions = numPartitions <= 0 ?
DEFAULT_NUM_PARTITIONS : numPartitions;
short effectiveReplicationFactor = replicationFactor <= 0 ?
DEFAULT_REPLICATION_FACTOR : replicationFactor;
return new NewTopic(topicName, effectiveNumPartitions, effectiveReplicationFactor);
newTopic = new NewTopic(topicName, effectiveNumPartitions, effectiveReplicationFactor);
} else {
return new NewTopic(topicName, partitionAssignments);
newTopic = new NewTopic(topicName, partitionAssignments);
}
if (!configs.isEmpty()) {
newTopic.configs(configs);
}
return newTopic;
}
}

6
tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java

@ -58,12 +58,12 @@ public class JsonSerializationTest { @@ -58,12 +58,12 @@ public class JsonSerializationTest {
verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
0, null, null, 0));
verify(new TopicsSpec());
verify(new PartitionsSpec(0, (short) 0, null));
verify(new PartitionsSpec(0, (short) 0, null, null));
Map<Integer, List<Integer>> partitionAssignments = new HashMap<Integer, List<Integer>>();
partitionAssignments.put(0, Arrays.asList(1, 2, 3));
partitionAssignments.put(1, Arrays.asList(1, 2, 3));
verify(new PartitionsSpec(0, (short) 0, partitionAssignments));
verify(new PartitionsSpec(0, (short) 0, null));
verify(new PartitionsSpec(0, (short) 0, partitionAssignments, null));
verify(new PartitionsSpec(0, (short) 0, null, null));
}
private <T> void verify(T val1) throws Exception {

15
tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java

@ -26,6 +26,8 @@ import java.util.HashMap; @@ -26,6 +26,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.trogdor.common.JsonUtil;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -40,13 +42,13 @@ public class TopicsSpecTest { @@ -40,13 +42,13 @@ public class TopicsSpecTest {
static {
FOO = new TopicsSpec();
PARTSA = new PartitionsSpec(3, (short) 3, null);
PARTSA = new PartitionsSpec(3, (short) 3, null, null);
FOO.set("topicA[0-2]", PARTSA);
Map<Integer, List<Integer>> assignmentsB = new HashMap<>();
assignmentsB.put(0, Arrays.asList(0, 1, 2));
assignmentsB.put(1, Arrays.asList(2, 3, 4));
PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB);
PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB, null);
FOO.set("topicB", PARTSB);
}
@ -77,4 +79,13 @@ public class TopicsSpecTest { @@ -77,4 +79,13 @@ public class TopicsSpecTest {
assertEquals(Integer.valueOf(1), partsBNumbers.get(1));
assertEquals(2, partsBNumbers.size());
}
@Test
public void testPartitionsSpec() throws Exception {
String text = "{\"numPartitions\": 5, \"configs\": {\"foo\": \"bar\"}}";
PartitionsSpec spec = JsonUtil.JSON_SERDE.readValue(text, PartitionsSpec.class);
assertEquals(5, spec.numPartitions());
assertEquals("bar", spec.configs().get("foo"));
assertEquals(1, spec.configs().size());
}
}

Loading…
Cancel
Save