Browse Source

KAFKA-7962: Avoid NPE for StickyAssignor (#6308)

* KAFKA-7962: StickyAssignor: throws NullPointerException during assignments if topic is deleted

https://issues.apache.org/jira/browse/KAFKA-7962

Consumer using StickyAssignor throws NullPointerException if a subscribed topic was removed.

* addressed vahidhashemian's comments

* lower NPath Complexity

* added a unit test
pull/6345/head
huxi 6 years ago committed by Vahid Hashemian
parent
commit
a1f7925d23
  1. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
  2. 32
      clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java

6
clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java

@ -219,13 +219,13 @@ public class StickyAssignor extends AbstractPartitionAssignor { @@ -219,13 +219,13 @@ public class StickyAssignor extends AbstractPartitionAssignor {
for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
String consumer = entry.getKey();
consumer2AllPotentialPartitions.put(consumer, new ArrayList<TopicPartition>());
for (String topic: entry.getValue().topics()) {
entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> {
for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
TopicPartition topicPartition = new TopicPartition(topic, i);
consumer2AllPotentialPartitions.get(consumer).add(topicPartition);
partition2AllPotentialConsumers.get(topicPartition).add(consumer);
}
}
});
// add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
if (!currentAssignment.containsKey(consumer))
@ -705,6 +705,8 @@ public class StickyAssignor extends AbstractPartitionAssignor { @@ -705,6 +705,8 @@ public class StickyAssignor extends AbstractPartitionAssignor {
*/
private <T> boolean hasIdenticalListElements(Collection<List<T>> col) {
Iterator<List<T>> it = col.iterator();
if (!it.hasNext())
return true;
List<T> cur = it.next();
while (it.hasNext()) {
List<T> next = it.next();

32
clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java

@ -606,6 +606,38 @@ public class StickyAssignorTest { @@ -606,6 +606,38 @@ public class StickyAssignorTest {
}
}
@Test
public void testAssignmentUpdatedForDeletedTopic() {
String consumerId = "consumer";
Map<String, Integer> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put("topic01", 1);
partitionsPerTopic.put("topic03", 100);
Map<String, Subscription> subscriptions =
Collections.singletonMap(consumerId, new Subscription(topics("topic01", "topic02", "topic03")));
Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
assertEquals(assignment.values().stream().mapToInt(topicPartitions -> topicPartitions.size()).sum(), 1 + 100);
assertEquals(Collections.singleton(consumerId), assignment.keySet());
assertTrue(isFullyBalanced(assignment));
}
@Test
public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() {
String topic = "topic01";
String consumer = "consumer01";
Map<String, Integer> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, 3);
Map<String, Subscription> subscriptions = new HashMap<>();
subscriptions.put(consumer, new Subscription(topics(topic)));
Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
subscriptions.put(consumer, new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer))));
assignment = assignor.assign(Collections.emptyMap(), subscriptions);
assertEquals(assignment.size(), 1);
assertTrue(assignment.get(consumer).isEmpty());
}
private String getTopicName(int i, int maxNum) {
return getCanonicalName("t", i, maxNum);
}

Loading…
Cancel
Save