Browse Source

KAFKA-1071; The random partition selected in DefaultEventHandler is not random across producer instances; reviewed by Neha Narkhede and Jun Rao

pull/10/head
Guozhang Wang 11 years ago committed by Jun Rao
parent
commit
93921a3a57
  1. 4
      core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala

4
core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala

@ -22,6 +22,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} @@ -22,6 +22,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging, SystemTime}
import scala.util.Random
import scala.collection.{Seq, Map}
import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
import java.util.concurrent.atomic._
@ -36,7 +37,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, @@ -36,7 +37,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
val partitionCounter = new AtomicInteger(0)
val correlationId = new AtomicInteger(0)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
@ -217,7 +217,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, @@ -217,7 +217,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
val index = Utils.abs(Random.nextInt) % availablePartitions.size
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
partitionId

Loading…
Cancel
Save