From a1f7925d23be0b81cb77561d2113443df52c6f74 Mon Sep 17 00:00:00 2001 From: huxi Date: Thu, 28 Feb 2019 14:27:38 +0800 Subject: [PATCH] KAFKA-7962: Avoid NPE for StickyAssignor (#6308) * KAFKA-7962: StickyAssignor: throws NullPointerException during assignments if topic is deleted https://issues.apache.org/jira/browse/KAFKA-7962 Consumer using StickyAssignor throws NullPointerException if a subscribed topic was removed. * addressed vahidhashemian's comments * lower NPath Complexity * added a unit test --- .../clients/consumer/StickyAssignor.java | 6 ++-- .../clients/consumer/StickyAssignorTest.java | 32 +++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index 4be34c2fdb1..ee537eba788 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -219,13 +219,13 @@ public class StickyAssignor extends AbstractPartitionAssignor { for (Entry entry: subscriptions.entrySet()) { String consumer = entry.getKey(); consumer2AllPotentialPartitions.put(consumer, new ArrayList()); - for (String topic: entry.getValue().topics()) { + entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> { for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { TopicPartition topicPartition = new TopicPartition(topic, i); consumer2AllPotentialPartitions.get(consumer).add(topicPartition); partition2AllPotentialConsumers.get(topicPartition).add(consumer); } - } + }); // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist if (!currentAssignment.containsKey(consumer)) @@ -705,6 +705,8 @@ public class StickyAssignor extends AbstractPartitionAssignor { */ private boolean hasIdenticalListElements(Collection> col) { Iterator> it = col.iterator(); + if (!it.hasNext()) + return true; List cur = it.next(); while (it.hasNext()) { List next = it.next(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index 31cee7edbf0..32ba16a4820 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -606,6 +606,38 @@ public class StickyAssignorTest { } } + @Test + public void testAssignmentUpdatedForDeletedTopic() { + String consumerId = "consumer"; + + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put("topic01", 1); + partitionsPerTopic.put("topic03", 100); + Map subscriptions = + Collections.singletonMap(consumerId, new Subscription(topics("topic01", "topic02", "topic03"))); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(assignment.values().stream().mapToInt(topicPartitions -> topicPartitions.size()).sum(), 1 + 100); + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() { + String topic = "topic01"; + String consumer = "consumer01"; + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + Map subscriptions = new HashMap<>(); + subscriptions.put(consumer, new Subscription(topics(topic))); + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + subscriptions.put(consumer, new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + + assignment = assignor.assign(Collections.emptyMap(), subscriptions); + assertEquals(assignment.size(), 1); + assertTrue(assignment.get(consumer).isEmpty()); + } + private String getTopicName(int i, int maxNum) { return getCanonicalName("t", i, maxNum); }