Browse Source

KAFKA-2056; Fix transient testRangePartitionAssignor failure; reviewed by Guozhang Wang

pull/56/head
Fangmin Lv 10 years ago committed by Guozhang Wang
parent
commit
bfbd3acbf7
  1. 6
      core/src/main/scala/kafka/consumer/PartitionAssignor.scala
  2. 4
      core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

6
core/src/main/scala/kafka/consumer/PartitionAssignor.scala

@ -112,6 +112,9 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { @@ -112,6 +112,9 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
assignmentForConsumer += (topicPartition -> threadId)
})
}
// assign Map.empty for the consumers which are not associated with topic partitions
ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId))
partitionAssignment
}
}
@ -164,6 +167,9 @@ class RangeAssignor() extends PartitionAssignor with Logging { @@ -164,6 +167,9 @@ class RangeAssignor() extends PartitionAssignor with Logging {
}
}
}
// assign Map.empty for the consumers which are not associated with topic partitions
ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId))
partitionAssignment
}
}

4
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

@ -684,9 +684,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -684,9 +684,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
releasePartitionOwnership(topicRegistry)
val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
val partitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse(
mutable.HashMap.empty[TopicAndPartition, ConsumerThreadId]
)
val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))

Loading…
Cancel
Save