diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 4d9a4252011..8a2cb1237ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -15,6 +15,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,7 +84,8 @@ public class SubscriptionState { throw new IllegalStateException("Topic " + topic + " was never subscribed to."); this.subscribedTopics.remove(topic); this.needsPartitionAssignment = true; - for (TopicPartition tp: assignedPartitions()) + final List existingAssignedPartitions = new ArrayList<>(assignedPartitions()); + for (TopicPartition tp: existingAssignedPartitions) if (topic.equals(tp.topic())) clearPartition(tp); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 319751c374c..c47f3fb699d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -73,6 +73,27 @@ public class SubscriptionStateTest { assertAllPositions(tp0, null); assertEquals(Collections.singleton(tp1), state.assignedPartitions()); } + + @Test + public void topicUnsubscription() { + final String topic = "test"; + state.subscribe(topic); + assertEquals(1, state.subscribedTopics().size()); + assertTrue(state.assignedPartitions().isEmpty()); + assertTrue(state.partitionsAutoAssigned()); + state.changePartitionAssignment(asList(tp0)); + state.committed(tp0, 1); + state.fetched(tp0, 1); + state.consumed(tp0, 1); + assertAllPositions(tp0, 1L); + state.changePartitionAssignment(asList(tp1)); + assertAllPositions(tp0, null); + assertEquals(Collections.singleton(tp1), state.assignedPartitions()); + + state.unsubscribe(topic); + assertEquals(0, state.subscribedTopics().size()); + assertTrue(state.assignedPartitions().isEmpty()); + } @Test(expected = IllegalArgumentException.class) public void cantChangeFetchPositionForNonAssignedPartition() { diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3eb5f95731a..cca6e94af1b 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -217,6 +217,25 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.close() } + def testUnsubscribeTopic() { + val callback = new TestConsumerReassignmentCallback() + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test + val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + try { + consumer0.subscribe(topic) + + // the initial subscription should cause a callback execution + while (callback.callsToAssigned == 0) + consumer0.poll(50) + + consumer0.unsubscribe(topic) + assertEquals(0, consumer0.subscriptions.size()) + } finally { + consumer0.close() + } + } + private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback { var callsToAssigned = 0 var callsToRevoked = 0