From bfbd3acbf71fa1b913de154f7ffa12aead28a2d2 Mon Sep 17 00:00:00 2001 From: Fangmin Lv Date: Wed, 15 Apr 2015 11:16:58 -0700 Subject: [PATCH] KAFKA-2056; Fix transient testRangePartitionAssignor failure; reviewed by Guozhang Wang --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 6 ++++++ .../scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 +--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 4afda8bee18..849284ad2cf 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -112,6 +112,9 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { assignmentForConsumer += (topicPartition -> threadId) }) } + + // assign Map.empty for the consumers which are not associated with topic partitions + ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId)) partitionAssignment } } @@ -164,6 +167,9 @@ class RangeAssignor() extends PartitionAssignor with Logging { } } } + + // assign Map.empty for the consumers which are not associated with topic partitions + ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId)) partitionAssignment } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e250b94626c..aa8d9404a3e 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -684,9 +684,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, releasePartitionOwnership(topicRegistry) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val globalPartitionAssignment = partitionAssignor.assign(assignmentContext) - val partitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse( - mutable.HashMap.empty[TopicAndPartition, ConsumerThreadId] - ) + val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))