Browse Source

kafka-1984; java producer may miss an available partition; patched by Jun Rao; reviewed by Ewen Cheslack-Postava, Jay Kreps, and Guozhang Wang

pull/51/head
Jun Rao 10 years ago
parent
commit
10311c1389
  1. 15
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
  2. 24
      clients/src/main/java/org/apache/kafka/common/Cluster.java
  3. 29
      clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java

15
clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java

@ -56,14 +56,15 @@ public class Partitioner { @@ -56,14 +56,15 @@ public class Partitioner {
+ "].");
return partition;
} else if (key == null) {
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
int part = Utils.abs(counter.getAndIncrement()) % numPartitions;
if (partitions.get(part).leader() != null)
return part;
}
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.abs(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
return Utils.abs(nextValue) % numPartitions;
}
} else {
// hash the key to choose a partition
return Utils.abs(Utils.murmur2(key)) % numPartitions;

24
clients/src/main/java/org/apache/kafka/common/Cluster.java

@ -25,6 +25,7 @@ public final class Cluster { @@ -25,6 +25,7 @@ public final class Cluster {
private final List<Node> nodes;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
@ -68,8 +69,18 @@ public final class Cluster { @@ -68,8 +69,18 @@ public final class Cluster {
}
}
this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet())
this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
String topic = entry.getKey();
List<PartitionInfo> partitionList = entry.getValue();
this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
for (PartitionInfo part : partitionList) {
if (part.leader() != null)
availablePartitions.add(part);
}
this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
}
this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
@ -143,6 +154,15 @@ public final class Cluster { @@ -143,6 +154,15 @@ public final class Cluster {
return this.partitionsByTopic.get(topic);
}
/**
* Get the list of available partitions for this topic
* @param topic The topic name
* @return A list of partitions
*/
public List<PartitionInfo> availablePartitionsForTopic(String topic) {
return this.availablePartitionsByTopic.get(topic);
}
/**
* Get the list of partitions whose leader is this node
* @param nodeId The node id

29
clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java

@ -33,9 +33,10 @@ public class PartitionerTest { @@ -33,9 +33,10 @@ public class PartitionerTest {
private Node node2 = new Node(2, "localhost", 101);
private Node[] nodes = new Node[] {node0, node1, node2};
private String topic = "test";
private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes),
new PartitionInfo(topic, 1, node1, nodes, nodes),
new PartitionInfo(topic, 2, null, nodes, nodes));
// Intentionally make the partition list not in partition order to test the edge cases.
private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
new PartitionInfo(topic, 2, node1, nodes, nodes),
new PartitionInfo(topic, 0, node0, nodes, nodes));
private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
@Test
@ -50,19 +51,19 @@ public class PartitionerTest { @@ -50,19 +51,19 @@ public class PartitionerTest {
}
@Test
public void testRoundRobinIsStable() {
int startPart = partitioner.partition("test", null, null, cluster);
public void testRoundRobinWithUnavailablePartitions() {
// When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition,
// and (2) the available partitions are selected in a round robin way.
int countForPart0 = 0;
int countForPart2 = 0;
for (int i = 1; i <= 100; i++) {
int partition = partitioner.partition("test", null, null, cluster);
assertEquals("Should yield a different partition each call with round-robin partitioner", partition, (startPart + i) % 2);
}
}
@Test
public void testRoundRobinWithDownNode() {
for (int i = 0; i < partitions.size(); i++) {
int part = partitioner.partition("test", null, null, cluster);
assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2);
if (part == 0)
countForPart0++;
else
countForPart2++;
}
assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
}
}

Loading…
Cancel
Save