Browse Source

KAFKA-1697; Remove support for producer ack > 1 on the broker; reviewed by Joel Koshy

pull/1442/head
Gwen Shapira 10 years ago committed by Joel Koshy
parent
commit
eab4f4c9f4
  1. 25
      clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
  2. 12
      clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
  3. 19
      clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  4. 2
      core/src/main/scala/kafka/cluster/Partition.scala
  5. 2
      core/src/main/scala/kafka/server/KafkaApis.scala
  6. 85
      core/src/main/scala/kafka/server/ReplicaManager.scala
  7. 2
      core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
  8. 26
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

25
clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java

@ -0,0 +1,25 @@ @@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class InvalidRequiredAcksException extends ApiException {
private static final long serialVersionUID = 1L;
public InvalidRequiredAcksException(String message) {
super(message);
}
}

12
clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java

@ -23,20 +23,8 @@ package org.apache.kafka.common.errors; @@ -23,20 +23,8 @@ package org.apache.kafka.common.errors;
public class NotEnoughReplicasAfterAppendException extends RetriableException {
private static final long serialVersionUID = 1L;
public NotEnoughReplicasAfterAppendException() {
super();
}
public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
super(message, cause);
}
public NotEnoughReplicasAfterAppendException(String message) {
super(message);
}
public NotEnoughReplicasAfterAppendException(Throwable cause) {
super(cause);
}
}

19
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

@ -19,21 +19,7 @@ package org.apache.kafka.common.protocol; @@ -19,21 +19,7 @@ package org.apache.kafka.common.protocol;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.*;
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
@ -70,7 +56,8 @@ public enum Errors { @@ -70,7 +56,8 @@ public enum Errors {
NOT_ENOUGH_REPLICAS(19,
new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required."));
new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified an invalid value for required acks."));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

2
core/src/main/scala/kafka/cluster/Partition.scala

@ -317,8 +317,6 @@ class Partition(val topic: String, @@ -317,8 +317,6 @@ class Partition(val topic: String,
} else {
(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
}
} else if (requiredAcks > 0 && numAcks >= requiredAcks) {
(true, ErrorMapping.NoError)
} else
(false, ErrorMapping.NoError)
case None =>

2
core/src/main/scala/kafka/server/KafkaApis.scala

@ -185,7 +185,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -185,7 +185,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the producer client will know that some error has happened and will refresh its metadata
if (errorInResponse) {
info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0"
.format(produceRequest.correlationId, produceRequest.clientId))
.format(produceRequest.correlationId, produceRequest.clientId))
requestChannel.closeConnection(request.processor, request)
} else {
requestChannel.noOperation(request.processor, request)

85
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -29,6 +29,8 @@ import kafka.message.{ByteBufferMessageSet, MessageSet} @@ -29,6 +29,8 @@ import kafka.message.{ByteBufferMessageSet, MessageSet}
import java.util.concurrent.atomic.AtomicBoolean
import java.io.{IOException, File}
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.protocol.Errors
import scala.Predef._
import scala.collection._
import scala.collection.mutable.HashMap
@ -253,43 +255,66 @@ class ReplicaManager(val config: KafkaConfig, @@ -253,43 +255,66 @@ class ReplicaManager(val config: KafkaConfig,
messagesPerPartition: Map[TopicAndPartition, MessageSet],
responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) {
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
if (isValidRequiredAcks(requiredAcks)) {
val produceStatus = localProduceResults.map{ case (topicAndPartition, result) =>
topicAndPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status
}
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
if(requiredAcks == 0 ||
requiredAcks == 1 ||
messagesPerPartition.size <= 0 ||
localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) {
// in case of the following we can respond immediately:
//
// 1. required acks = 0 or 1
// 2. there is no data to append
// 3. all partition appends have failed
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
} else {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
val produceStatus = localProduceResults.map { case (topicAndPartition, result) =>
topicAndPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status
}
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
val responseStatus = messagesPerPartition.map {
case (topicAndPartition, messageSet) =>
(topicAndPartition ->
ProducerResponseStatus(Errors.INVALID_REQUIRED_ACKS.code,
LogAppendInfo.UnknownLogAppendInfo.firstOffset))
}
responseCallback(responseStatus)
}
}
// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
//
// 1. required acks = -1
// 2. there is data to append
// 3. at least one partition append was successful (fewer errors than partitions)
private def delayedRequestRequired(requiredAcks: Short, messagesPerPartition: Map[TopicAndPartition, MessageSet],
localProduceResults: Map[TopicAndPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
messagesPerPartition.size > 0 &&
localProduceResults.values.count(_.error.isDefined) < messagesPerPartition.size
}
private def isValidRequiredAcks(requiredAcks: Short): Boolean = {
requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0
}
/**
* Append the messages to the local replica logs
*/

2
core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala

@ -59,7 +59,7 @@ object SerializationTestUtils { @@ -59,7 +59,7 @@ object SerializationTestUtils {
private val partitionDataMessage3 = new ByteBufferMessageSet(new Message("fourth message".getBytes))
private val partitionDataProducerRequestArray = Array(partitionDataMessage0, partitionDataMessage1, partitionDataMessage2, partitionDataMessage3)
private val topicDataProducerRequest = {
val topicDataProducerRequest = {
val groupedData = Array(topic1, topic2).flatMap(topic =>
partitionDataProducerRequestArray.zipWithIndex.map
{

26
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -17,16 +17,21 @@ @@ -17,16 +17,21 @@
package kafka.server
import kafka.api.{ProducerResponseStatus, SerializationTestUtils, ProducerRequest}
import kafka.common.TopicAndPartition
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import java.util.concurrent.atomic.AtomicBoolean
import java.io.File
import org.apache.kafka.common.protocol.Errors
import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import scala.collection.Map
class ReplicaManagerTest extends JUnit3Suite {
val topic = "test-topic"
@ -63,4 +68,25 @@ class ReplicaManagerTest extends JUnit3Suite { @@ -63,4 +68,25 @@ class ReplicaManagerTest extends JUnit3Suite {
// shutdown the replica manager upon test completion
rm.shutdown(false)
}
@Test
def testIllegalRequiredAcks() {
val props = TestUtils.createBrokerConfig(1)
val config = new KafkaConfig(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest)
def callback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) = {
assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS.code)
}
rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
rm.shutdown(false);
TestUtils.verifyNonDaemonThreadsStatus
}
}

Loading…
Cancel
Save