Browse Source

Disallow clients to set replicaId in FetchRequest; kafka-699; patched by Jun Rao; reviewed by Neha Narkhede

0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
777f662201
  1. 24
      core/src/main/scala/kafka/api/FetchRequest.scala
  2. 6
      core/src/main/scala/kafka/javaapi/FetchRequest.scala

24
core/src/main/scala/kafka/api/FetchRequest.scala

@ -58,7 +58,7 @@ object FetchRequest {
} }
} }
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId, correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId, clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId, replicaId: Int = Request.OrdinaryConsumerId,
@ -72,6 +72,23 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
*/ */
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
/**
* Public constructor for the clients
*/
def this(correlationId: Int,
clientId: String,
maxWait: Int,
minBytes: Int,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) {
this(versionId = FetchRequest.CurrentVersion,
correlationId = correlationId,
clientId = clientId,
replicaId = Request.OrdinaryConsumerId,
maxWait = maxWait,
minBytes= minBytes,
requestInfo = requestInfo)
}
def writeTo(buffer: ByteBuffer) { def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId) buffer.putShort(versionId)
buffer.putInt(correlationId) buffer.putInt(correlationId)
@ -144,7 +161,10 @@ class FetchRequestBuilder() {
this this
} }
def replicaId(replicaId: Int): FetchRequestBuilder = { /**
* Only for internal use. Clients shouldn't set replicaId.
*/
private[kafka] def replicaId(replicaId: Int): FetchRequestBuilder = {
this.replicaId = replicaId this.replicaId = replicaId
this this
} }

6
core/src/main/scala/kafka/javaapi/FetchRequest.scala

@ -18,14 +18,12 @@
package kafka.javaapi package kafka.javaapi
import scala.collection.JavaConversions import scala.collection.JavaConversions
import kafka.api.PartitionFetchInfo
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionFetchInfo}
class FetchRequest(correlationId: Int, class FetchRequest(correlationId: Int,
clientId: String, clientId: String,
replicaId: Int,
maxWait: Int, maxWait: Int,
minBytes: Int, minBytes: Int,
requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
@ -35,7 +33,7 @@ class FetchRequest(correlationId: Int,
kafka.api.FetchRequest( kafka.api.FetchRequest(
correlationId = correlationId, correlationId = correlationId,
clientId = clientId, clientId = clientId,
replicaId = replicaId, replicaId = Request.OrdinaryConsumerId,
maxWait = maxWait, maxWait = maxWait,
minBytes = minBytes, minBytes = minBytes,
requestInfo = scalaMap requestInfo = scalaMap

Loading…
Cancel
Save