diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index b4fb87451a9..79687471b15 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -58,13 +58,13 @@ object FetchRequest { } } -case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, - correlationId: Int = FetchRequest.DefaultCorrelationId, - clientId: String = ConsumerConfig.DefaultClientId, - replicaId: Int = Request.OrdinaryConsumerId, - maxWait: Int = FetchRequest.DefaultMaxWait, - minBytes: Int = FetchRequest.DefaultMinBytes, - requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) +case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion, + correlationId: Int = FetchRequest.DefaultCorrelationId, + clientId: String = ConsumerConfig.DefaultClientId, + replicaId: Int = Request.OrdinaryConsumerId, + maxWait: Int = FetchRequest.DefaultMaxWait, + minBytes: Int = FetchRequest.DefaultMinBytes, + requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { /** @@ -72,6 +72,23 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, */ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + /** + * Public constructor for the clients + */ + def this(correlationId: Int, + clientId: String, + maxWait: Int, + minBytes: Int, + requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) { + this(versionId = FetchRequest.CurrentVersion, + correlationId = correlationId, + clientId = clientId, + replicaId = Request.OrdinaryConsumerId, + maxWait = maxWait, + minBytes= minBytes, + requestInfo = requestInfo) + } + def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) @@ -144,7 +161,10 @@ class FetchRequestBuilder() { this } - def replicaId(replicaId: Int): FetchRequestBuilder = { + /** + * Only for internal use. Clients shouldn't set replicaId. + */ + private[kafka] def replicaId(replicaId: Int): FetchRequestBuilder = { this.replicaId = replicaId this } diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala index 44d148e7d89..b4752404895 100644 --- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -18,14 +18,12 @@ package kafka.javaapi import scala.collection.JavaConversions -import kafka.api.PartitionFetchInfo import java.nio.ByteBuffer import kafka.common.TopicAndPartition - +import kafka.api.{Request, PartitionFetchInfo} class FetchRequest(correlationId: Int, clientId: String, - replicaId: Int, maxWait: Int, minBytes: Int, requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { @@ -35,7 +33,7 @@ class FetchRequest(correlationId: Int, kafka.api.FetchRequest( correlationId = correlationId, clientId = clientId, - replicaId = replicaId, + replicaId = Request.OrdinaryConsumerId, maxWait = maxWait, minBytes = minBytes, requestInfo = scalaMap