Browse Source

KAFKA-2381: Fix concurrent modification on assigned partition while looping over it; reviewed by Jason Gustafson, Aditya Auradkar, Ewen Cheslack-Postava, Ismael Juma and Guozhang Wang

pull/70/merge
Ashish Singh 9 years ago committed by Guozhang Wang
parent
commit
269c2407d4
  1. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  2. 21
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
  3. 19
      core/src/test/scala/integration/kafka/api/ConsumerTest.scala

4
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

@ -15,6 +15,7 @@ package org.apache.kafka.clients.consumer.internals; @@ -15,6 +15,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -83,7 +84,8 @@ public class SubscriptionState { @@ -83,7 +84,8 @@ public class SubscriptionState {
throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
this.subscribedTopics.remove(topic);
this.needsPartitionAssignment = true;
for (TopicPartition tp: assignedPartitions())
final List<TopicPartition> existingAssignedPartitions = new ArrayList<>(assignedPartitions());
for (TopicPartition tp: existingAssignedPartitions)
if (topic.equals(tp.topic()))
clearPartition(tp);
}

21
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java

@ -74,6 +74,27 @@ public class SubscriptionStateTest { @@ -74,6 +74,27 @@ public class SubscriptionStateTest {
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
}
@Test
public void topicUnsubscription() {
final String topic = "test";
state.subscribe(topic);
assertEquals(1, state.subscribedTopics().size());
assertTrue(state.assignedPartitions().isEmpty());
assertTrue(state.partitionsAutoAssigned());
state.changePartitionAssignment(asList(tp0));
state.committed(tp0, 1);
state.fetched(tp0, 1);
state.consumed(tp0, 1);
assertAllPositions(tp0, 1L);
state.changePartitionAssignment(asList(tp1));
assertAllPositions(tp0, null);
assertEquals(Collections.singleton(tp1), state.assignedPartitions());
state.unsubscribe(topic);
assertEquals(0, state.subscribedTopics().size());
assertTrue(state.assignedPartitions().isEmpty());
}
@Test(expected = IllegalArgumentException.class)
public void cantChangeFetchPositionForNonAssignedPartition() {
state.fetched(tp0, 1);

19
core/src/test/scala/integration/kafka/api/ConsumerTest.scala

@ -217,6 +217,25 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -217,6 +217,25 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumer0.close()
}
def testUnsubscribeTopic() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
try {
consumer0.subscribe(topic)
// the initial subscription should cause a callback execution
while (callback.callsToAssigned == 0)
consumer0.poll(50)
consumer0.unsubscribe(topic)
assertEquals(0, consumer0.subscriptions.size())
} finally {
consumer0.close()
}
}
private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
var callsToAssigned = 0
var callsToRevoked = 0

Loading…
Cancel
Save