diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java new file mode 100644 index 00000000000..9d19b2844ae --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class InvalidRequiredAcksException extends ApiException { + private static final long serialVersionUID = 1L; + + public InvalidRequiredAcksException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java index a6107b81894..fd7f6d8f5c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java @@ -23,20 +23,8 @@ package org.apache.kafka.common.errors; public class NotEnoughReplicasAfterAppendException extends RetriableException { private static final long serialVersionUID = 1L; - public NotEnoughReplicasAfterAppendException() { - super(); - } - - public NotEnoughReplicasAfterAppendException(String message, Throwable cause) { - super(message, cause); - } - public NotEnoughReplicasAfterAppendException(String message) { super(message); } - public NotEnoughReplicasAfterAppendException(Throwable cause) { - super(cause); - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index a8deac4ce51..ad2171f5417 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -19,21 +19,7 @@ package org.apache.kafka.common.protocol; import java.util.HashMap; import java.util.Map; -import org.apache.kafka.common.errors.ApiException; -import org.apache.kafka.common.errors.CorruptRecordException; -import org.apache.kafka.common.errors.InvalidTopicException; -import org.apache.kafka.common.errors.LeaderNotAvailableException; -import org.apache.kafka.common.errors.NetworkException; -import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException; -import org.apache.kafka.common.errors.NotEnoughReplicasException; -import org.apache.kafka.common.errors.NotLeaderForPartitionException; -import org.apache.kafka.common.errors.OffsetMetadataTooLarge; -import org.apache.kafka.common.errors.OffsetOutOfRangeException; -import org.apache.kafka.common.errors.RecordBatchTooLargeException; -import org.apache.kafka.common.errors.RecordTooLargeException; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.*; /** * This class contains all the client-server errors--those errors that must be sent from the server to the client. These @@ -70,7 +56,8 @@ public enum Errors { NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")), NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, - new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")); + new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")), + INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e6ad8be5e33..bfe4f45486b 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -317,8 +317,6 @@ class Partition(val topic: String, } else { (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) } - } else if (requiredAcks > 0 && numAcks >= requiredAcks) { - (true, ErrorMapping.NoError) } else (false, ErrorMapping.NoError) case None => diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6ee7d8819a9..703886a1d48 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -185,7 +185,7 @@ class KafkaApis(val requestChannel: RequestChannel, // the producer client will know that some error has happened and will refresh its metadata if (errorInResponse) { info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" - .format(produceRequest.correlationId, produceRequest.clientId)) + .format(produceRequest.correlationId, produceRequest.clientId)) requestChannel.closeConnection(request.processor, request) } else { requestChannel.noOperation(request.processor, request) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fb948b9ab28..ce36cc72606 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -29,6 +29,8 @@ import kafka.message.{ByteBufferMessageSet, MessageSet} import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit +import org.apache.kafka.common.protocol.Errors + import scala.Predef._ import scala.collection._ import scala.collection.mutable.HashMap @@ -253,43 +255,66 @@ class ReplicaManager(val config: KafkaConfig, messagesPerPartition: Map[TopicAndPartition, MessageSet], responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { - val sTime = SystemTime.milliseconds - val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) - debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) + if (isValidRequiredAcks(requiredAcks)) { - val produceStatus = localProduceResults.map{ case (topicAndPartition, result) => - topicAndPartition -> - ProducePartitionStatus( - result.info.lastOffset + 1, // required offset - ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status - } + val sTime = SystemTime.milliseconds + val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) + debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - if(requiredAcks == 0 || - requiredAcks == 1 || - messagesPerPartition.size <= 0 || - localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) { - // in case of the following we can respond immediately: - // - // 1. required acks = 0 or 1 - // 2. there is no data to append - // 3. all partition appends have failed - val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) - responseCallback(produceResponseStatus) - } else { - // create delayed produce operation - val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) - val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) + val produceStatus = localProduceResults.map { case (topicAndPartition, result) => + topicAndPartition -> + ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status + } - // create a list of (topic, partition) pairs to use as keys for this delayed produce operation - val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq + if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) { + // create delayed produce operation + val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) + val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) - // try to complete the request immediately, otherwise put it into the purgatory - // this is because while the delayed produce operation is being created, new - // requests may arrive and hence make this operation completable. - delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + // create a list of (topic, partition) pairs to use as keys for this delayed produce operation + val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq + + // try to complete the request immediately, otherwise put it into the purgatory + // this is because while the delayed produce operation is being created, new + // requests may arrive and hence make this operation completable. + delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + + } else { + // we can respond immediately + val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) + responseCallback(produceResponseStatus) + } + } else { + // If required.acks is outside accepted range, something is wrong with the client + // Just return an error and don't handle the request at all + val responseStatus = messagesPerPartition.map { + case (topicAndPartition, messageSet) => + (topicAndPartition -> + ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code, + LogAppendInfo.UnknownLogAppendInfo.firstOffset)) + } + responseCallback(responseStatus) } } + // If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete + // + // 1. required acks = -1 + // 2. there is data to append + // 3. at least one partition append was successful (fewer errors than partitions) + private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition, MessageSet], + localProduceResults: Map[TopicAndPartition, LogAppendResult]): Boolean = { + requiredAcks == -1 && + messagesPerPartition.size > 0 && + localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size + } + + private def isValidRequiredAcks(requiredAcks: Short): Boolean = { + requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0 + } + /** * Append the messages to the local replica logs */ diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index a1f72f8c204..fba852afa1b 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -59,7 +59,7 @@ object SerializationTestUtils { private val partitionDataMessage3 = new ByteBufferMessageSet(new Message("fourth message".getBytes)) private val partitionDataProducerRequestArray = Array(partitionDataMessage0, partitionDataMessage1, partitionDataMessage2, partitionDataMessage3) - private val topicDataProducerRequest = { + val topicDataProducerRequest = { val groupedData = Array(topic1, topic2).flatMap(topic => partitionDataProducerRequestArray.zipWithIndex.map { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index faa907131ed..d1ed5c2c506 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -17,16 +17,21 @@ package kafka.server +import kafka.api.{ProducerResponseStatus, SerializationTestUtils, ProducerRequest} +import kafka.common.TopicAndPartition import kafka.utils.{MockScheduler, MockTime, TestUtils} import java.util.concurrent.atomic.AtomicBoolean import java.io.File +import org.apache.kafka.common.protocol.Errors import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite import org.junit.Test +import scala.collection.Map + class ReplicaManagerTest extends JUnit3Suite { val topic = "test-topic" @@ -63,4 +68,25 @@ class ReplicaManagerTest extends JUnit3Suite { // shutdown the replica manager upon test completion rm.shutdown(false) } + + @Test + def testIllegalRequiredAcks() { + val props = TestUtils.createBrokerConfig(1) + val config = new KafkaConfig(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val time: MockTime = new MockTime() + val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) + val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest) + def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = { + assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code) + } + + rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback) + + rm.shutdown(false); + + TestUtils.verifyNonDaemonThreadsStatus + + } }