Browse Source

KAFKA-6796; Fix surprising UNKNOWN_TOPIC error from requests to non-replicas (#4883)

Currently if the client sends a produce request or a fetch request to a broker which isn't a replica,
we return UNKNOWN_TOPIC_OR_PARTITION. This is a bit surprising to see when the topic actually
exists. It would be better to return NOT_LEADER to avoid confusion. Clients typically handle both errors by refreshing metadata and retrying, so changing this should not cause any change in behavior on the client. This case can be hit following a partition reassignment after the leader is moved and the local replica is deleted.

To validate the current behavior and the fix, I've added integration tests for the fetch and produce APIs.
pull/4926/head
Jason Gustafson 7 years ago committed by Ismael Juma
parent
commit
acd669e424
  1. 32
      core/src/main/scala/kafka/server/KafkaApis.scala
  2. 23
      core/src/main/scala/kafka/server/ReplicaManager.scala
  3. 21
      core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
  4. 127
      core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  5. 23
      core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala

32
core/src/main/scala/kafka/server/KafkaApis.scala

@ -287,7 +287,7 @@ class KafkaApis(val requestChannel: RequestChannel,
for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) { for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) {
if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED) unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition.topic)) else if (!metadataCache.contains(topicPartition))
nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION) nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
else else
authorizedTopicRequestInfoBldr += (topicPartition -> partitionData) authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
@ -401,7 +401,7 @@ class KafkaApis(val requestChannel: RequestChannel,
for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) { for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition.topic)) else if (!metadataCache.contains(topicPartition))
nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else else
authorizedRequestInfo += (topicPartition -> memoryRecords) authorizedRequestInfo += (topicPartition -> memoryRecords)
@ -502,13 +502,13 @@ class KafkaApis(val requestChannel: RequestChannel,
if (fetchRequest.isFromFollower()) { if (fetchRequest.isFromFollower()) {
// The follower must have ClusterAction on ClusterResource in order to fetch partition data. // The follower must have ClusterAction on ClusterResource in order to fetch partition data.
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
fetchContext.foreachPartition((part, data) => { fetchContext.foreachPartition((topicPartition, data) => {
if (!metadataCache.contains(part.topic)) { if (!metadataCache.contains(topicPartition)) {
erroneous += part -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
} else { } else {
interesting += (part -> data) interesting += (topicPartition -> data)
} }
}) })
} else { } else {
@ -520,17 +520,17 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} else { } else {
// Regular Kafka consumers need READ permission on each partition they are fetching. // Regular Kafka consumers need READ permission on each partition they are fetching.
fetchContext.foreachPartition((part, data) => { fetchContext.foreachPartition((topicPartition, data) => {
if (!authorize(request.session, Read, new Resource(Topic, part.topic))) if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
else if (!metadataCache.contains(part.topic)) else if (!metadataCache.contains(topicPartition))
erroneous += part -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
else else
interesting += (part -> data) interesting += (topicPartition -> data)
}) })
} }
@ -1062,7 +1062,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// version 0 reads offsets from ZK // version 0 reads offsets from ZK
val authorizedPartitionData = authorizedPartitions.map { topicPartition => val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
try { try {
if (!metadataCache.contains(topicPartition.topic)) if (!metadataCache.contains(topicPartition))
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
else { else {
val payloadOpt = zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition) val payloadOpt = zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
@ -1508,7 +1508,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic))) if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic)))
unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse( unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED) DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition.topic)) else if (!metadataCache.contains(topicPartition))
nonExistingTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse( nonExistingTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse(
DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION) DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION)
else else
@ -1720,7 +1720,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) || if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) ||
!authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) !authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(topicPartition.topic)) else if (!metadataCache.contains(topicPartition))
nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
else else
authorizedPartitions.add(topicPartition) authorizedPartitions.add(topicPartition)
@ -1806,7 +1806,7 @@ class KafkaApis(val requestChannel: RequestChannel,
for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) { for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(topicPartition.topic)) else if (!metadataCache.contains(topicPartition))
nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
else else
authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset) authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset)

23
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -426,8 +426,16 @@ class ReplicaManager(val config: KafkaConfig,
def getPartitionAndLeaderReplicaIfLocal(topicPartition: TopicPartition): (Partition, Replica) = { def getPartitionAndLeaderReplicaIfLocal(topicPartition: TopicPartition): (Partition, Replica) = {
val partitionOpt = getPartition(topicPartition) val partitionOpt = getPartition(topicPartition)
partitionOpt match { partitionOpt match {
case None if metadataCache.contains(topicPartition) =>
// The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which
// forces clients to refresh metadata to find the new location. This can happen, for example,
// during a partition reassignment if a produce request from the client is sent to a broker after
// the local replica has been deleted.
throw new NotLeaderForPartitionException(s"Broker $localBrokerId is not a replica of $topicPartition")
case None => case None =>
throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist on $localBrokerId") throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist")
case Some(partition) => case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition) if (partition eq ReplicaManager.OfflinePartition)
throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
@ -736,17 +744,8 @@ class ReplicaManager(val config: KafkaConfig,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")))) Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
} else { } else {
try { try {
val partitionOpt = getPartition(topicPartition) val (partition, _) = getPartitionAndLeaderReplicaIfLocal(topicPartition)
val info = partitionOpt match { val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition)
throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
.format(topicPartition, localBrokerId))
}
val numAppendedMessages = info.numMessages val numAppendedMessages = info.numMessages
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate // update stats for successfully appended bytes and messages as bytesInRate and messageInRate

21
core/src/test/scala/unit/kafka/server/FetchRequestTest.scala

@ -170,6 +170,27 @@ class FetchRequestTest extends BaseRequestTest {
assertEquals(0, records(partitionData).map(_.sizeInBytes).sum) assertEquals(0, records(partitionData).map(_.sizeInBytes).sum)
} }
@Test
def testFetchRequestToNonReplica(): Unit = {
val topic = "topic"
val partition = 0
val topicPartition = new TopicPartition(topic, partition)
// Create a single-partition topic and find a broker which is not the leader
val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, 1, servers)
val leader = partitionToLeader(partition)
val nonReplicaOpt = servers.find(_.config.brokerId != leader)
assertTrue(nonReplicaOpt.isDefined)
val nonReplicaId = nonReplicaOpt.get.config.brokerId
// Send the fetch request to the non-replica and verify the error code
val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024,
Seq(topicPartition))).build()
val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest)
val partitionData = fetchResponse.responseData.get(topicPartition)
assertEquals(Errors.NOT_LEADER_FOR_PARTITION, partitionData.error)
}
/** /**
* Tests that down-conversions dont leak memory. Large down conversions are triggered * Tests that down-conversions dont leak memory. Large down conversions are triggered
* in the server. The client closes its connection after reading partial data when the * in the server. The client closes its connection after reading partial data when the

127
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

@ -20,6 +20,7 @@ package kafka.server
import java.lang.{Long => JLong} import java.lang.{Long => JLong}
import java.net.InetAddress import java.net.InetAddress
import java.util import java.util
import java.util.Collections
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0} import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0}
import kafka.cluster.Replica import kafka.cluster.Replica
@ -40,6 +41,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint}
import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
@ -106,6 +108,84 @@ class KafkaApisTest {
) )
} }
@Test
def testOffsetCommitWithInvalidPartition(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 1)
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val partitionOffsetCommitData = new OffsetCommitRequest.PartitionData(15L, "")
val (offsetCommitRequest, request) = buildRequest(new OffsetCommitRequest.Builder("groupId",
Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
val capturedResponse = expectThrottleCallbackAndInvoke()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleOffsetCommitRequest(request)
val response = readResponse(ApiKeys.OFFSET_COMMIT, offsetCommitRequest, capturedResponse)
.asInstanceOf[OffsetCommitResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData().get(invalidTopicPartition))
}
checkInvalidPartition(-1)
checkInvalidPartition(1) // topic has only one partition
}
@Test
def testTxnOffsetCommitWithInvalidPartition(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 1)
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "")
val (offsetCommitRequest, request) = buildRequest(new TxnOffsetCommitRequest.Builder("txnlId", "groupId",
15L, 0.toShort, Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
val capturedResponse = expectThrottleCallbackAndInvoke()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleTxnOffsetCommitRequest(request)
val response = readResponse(ApiKeys.TXN_OFFSET_COMMIT, offsetCommitRequest, capturedResponse)
.asInstanceOf[TxnOffsetCommitResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
}
checkInvalidPartition(-1)
checkInvalidPartition(1) // topic has only one partition
}
@Test
def testAddPartitionsToTxnWithInvalidPartition(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 1)
def checkInvalidPartition(invalidPartitionId: Int): Unit = {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val (addPartitionsToTxnRequest, request) = buildRequest(new AddPartitionsToTxnRequest.Builder(
"txnlId", 15L, 0.toShort, List(invalidTopicPartition).asJava))
val capturedResponse = expectThrottleCallbackAndInvoke()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
createKafkaApis().handleAddPartitionToTxnRequest(request)
val response = readResponse(ApiKeys.ADD_PARTITIONS_TO_TXN, addPartitionsToTxnRequest, capturedResponse)
.asInstanceOf[AddPartitionsToTxnResponse]
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition))
}
checkInvalidPartition(-1)
checkInvalidPartition(1) // topic has only one partition
}
@Test(expected = classOf[UnsupportedVersionException]) @Test(expected = classOf[UnsupportedVersionException])
def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null) createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null)
@ -284,8 +364,6 @@ class KafkaApisTest {
val timestamp: JLong = time.milliseconds() val timestamp: JLong = time.milliseconds()
val limitOffset = 15L val limitOffset = 15L
val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
val replica = EasyMock.mock(classOf[Replica]) val replica = EasyMock.mock(classOf[Replica])
val log = EasyMock.mock(classOf[Log]) val log = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
@ -295,8 +373,7 @@ class KafkaApisTest {
EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = limitOffset)) EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = limitOffset))
EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log)) EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
EasyMock.expect(log.fetchOffsetsByTimestamp(timestamp)).andReturn(Some(TimestampOffset(timestamp = timestamp, offset = limitOffset))) EasyMock.expect(log.fetchOffsetsByTimestamp(timestamp)).andReturn(Some(TimestampOffset(timestamp = timestamp, offset = limitOffset)))
expectThrottleCallbackAndInvoke(capturedThrottleCallback) val capturedResponse = expectThrottleCallbackAndInvoke()
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log) EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel) val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@ -327,8 +404,6 @@ class KafkaApisTest {
val tp = new TopicPartition("foo", 0) val tp = new TopicPartition("foo", 0)
val limitOffset = 15L val limitOffset = 15L
val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
val replica = EasyMock.mock(classOf[Replica]) val replica = EasyMock.mock(classOf[Replica])
val log = EasyMock.mock(classOf[Log]) val log = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
@ -339,8 +414,7 @@ class KafkaApisTest {
EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log)) EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
EasyMock.expect(log.fetchOffsetsByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)) EasyMock.expect(log.fetchOffsetsByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP))
.andReturn(Some(TimestampOffset(timestamp = ListOffsetResponse.UNKNOWN_TIMESTAMP, offset = limitOffset))) .andReturn(Some(TimestampOffset(timestamp = ListOffsetResponse.UNKNOWN_TIMESTAMP, offset = limitOffset)))
expectThrottleCallbackAndInvoke(capturedThrottleCallback) val capturedResponse = expectThrottleCallbackAndInvoke()
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log) EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel) val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@ -393,14 +467,12 @@ class KafkaApisTest {
* Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively. * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
*/ */
private def updateMetadataCacheWithInconsistentListeners(): (ListenerName, ListenerName) = { private def updateMetadataCacheWithInconsistentListeners(): (ListenerName, ListenerName) = {
import UpdateMetadataRequest.{Broker => UBroker}
import UpdateMetadataRequest.{EndPoint => UEndPoint}
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val anotherListener = new ListenerName("LISTENER2") val anotherListener = new ListenerName("LISTENER2")
val brokers = Set( val brokers = Set(
new UBroker(0, Seq(new UEndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener), new Broker(0, Seq(new EndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener),
new UEndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT, anotherListener)).asJava, "rack"), new EndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT, anotherListener)).asJava, "rack"),
new UBroker(1, Seq(new UEndPoint("broker1", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava, new Broker(1, Seq(new EndPoint("broker1", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava,
"rack") "rack")
) )
val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
@ -410,10 +482,7 @@ class KafkaApisTest {
} }
private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = { private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = {
val capturedResponse = EasyMock.newCapture[RequestChannel.Response]() val capturedResponse = expectThrottleCallbackAndInvoke()
val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
expectThrottleCallbackAndInvoke(capturedThrottleCallback)
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(clientRequestQuotaManager, requestChannel) EasyMock.replay(clientRequestQuotaManager, requestChannel)
val (metadataRequest, requestChannelRequest) = buildRequest(MetadataRequest.Builder.allTopics, requestListener) val (metadataRequest, requestChannelRequest) = buildRequest(MetadataRequest.Builder.allTopics, requestListener)
@ -426,8 +495,6 @@ class KafkaApisTest {
val tp = new TopicPartition("foo", 0) val tp = new TopicPartition("foo", 0)
val latestOffset = 15L val latestOffset = 15L
val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
val replica = EasyMock.mock(classOf[Replica]) val replica = EasyMock.mock(classOf[Replica])
val log = EasyMock.mock(classOf[Log]) val log = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
@ -435,8 +502,8 @@ class KafkaApisTest {
EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = latestOffset)) EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = latestOffset))
else else
EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = latestOffset)) EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = latestOffset))
expectThrottleCallbackAndInvoke(capturedThrottleCallback)
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) val capturedResponse = expectThrottleCallbackAndInvoke()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log) EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log)
val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel) val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@ -484,7 +551,8 @@ class KafkaApisTest {
AbstractResponse.parseResponse(api, struct) AbstractResponse.parseResponse(api, struct)
} }
private def expectThrottleCallbackAndInvoke(capturedThrottleCallback: Capture[Int => Unit]): Unit = { private def expectThrottleCallbackAndInvoke(): Capture[RequestChannel.Response] = {
val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle( EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle(
EasyMock.anyObject[RequestChannel.Request](), EasyMock.anyObject[RequestChannel.Request](),
EasyMock.capture(capturedThrottleCallback))) EasyMock.capture(capturedThrottleCallback)))
@ -494,6 +562,21 @@ class KafkaApisTest {
callback(0) callback(0)
} }
}) })
val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
capturedResponse
}
private def setupBasicMetadataCache(topic: String, numPartitions: Int = 1): Unit = {
val replicas = List(0.asInstanceOf[Integer]).asJava
val partitionState = new UpdateMetadataRequest.PartitionState(1, 0, 1, replicas, 0, replicas, Collections.emptyList())
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val broker = new Broker(0, Seq(new EndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava, "rack")
val partitions = (0 until numPartitions).map(new TopicPartition(topic, _) -> partitionState).toMap
val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, partitions.asJava, Set(broker).asJava).build()
metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
} }
} }

23
core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala

@ -59,6 +59,29 @@ class ProduceRequestTest extends BaseRequestTest {
new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
} }
@Test
def testProduceToNonReplica() {
val topic = "topic"
val partition = 0
// Create a single-partition topic and find a broker which is not the leader
val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, 1, servers)
val leader = partitionToLeader(partition)
val nonReplicaOpt = servers.find(_.config.brokerId != leader)
assertTrue(nonReplicaOpt.isDefined)
val nonReplicaId = nonReplicaOpt.get.config.brokerId
// Send the produce request to the non-replica
val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("key".getBytes, "value".getBytes))
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> records)
val produceRequest = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
assertEquals(1, produceResponse.responses.size)
assertEquals(Errors.NOT_LEADER_FOR_PARTITION, produceResponse.responses.asScala.head._2.error)
}
/* returns a pair of partition id and leader id */ /* returns a pair of partition id and leader id */
private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) = { private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) = {
val partitionToLeader = TestUtils.createTopic(zkClient, topic, 3, 2, servers) val partitionToLeader = TestUtils.createTopic(zkClient, topic, 3, 2, servers)

Loading…
Cancel
Save