From c5ef614bbf133e18bd207e22f2697a2d1d3e8e4e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 17 Sep 2018 12:36:53 -0700 Subject: [PATCH] KAFKA-7414; Out of range errors should never be fatal for follower (#5654) This patch fixes the inconsistent handling of out of range errors in the replica fetcher. Previously we would raise a fatal error if the follower's offset is ahead of the leader's and unclean leader election is not enabled. The behavior was inconsistent depending on the message format. With KIP-101/KIP-279 and the new message format, upon becoming a follower, the replica would use leader epoch information to reconcile the end of the log with the leader and simply truncate. Additionally, with the old format, the check is not really bulletproof for detecting data loss since the unclean leader's end offset might have already caught up to the follower's offset at the time of its initial fetch or when it queries for the current log end offset. With this patch, we simply skip the unclean leader election check and allow the needed truncation to occur. When the truncation offset is below the high watermark, a warning will be logged. This makes the behavior consistent for all message formats and removes a scenario in which an error on one partition can bring the broker down. Reviewers: Ismael Juma , Jun Rao --- .../kafka/server/AbstractFetcherThread.scala | 17 +- .../server/ReplicaAlterLogDirsThread.scala | 2 - .../kafka/server/ReplicaFetcherThread.scala | 9 +- .../ReplicaFetcherThreadFatalErrorTest.scala | 146 ------------------ .../server/AbstractFetcherThreadTest.scala | 18 +-- 5 files changed, 11 insertions(+), 181 deletions(-) delete mode 100644 core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 44137cf35c3..4a2719e36c7 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong import com.yammer.metrics.core.Gauge import kafka.log.LogAppendInfo import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.internals.{FatalExitError, PartitionStates} +import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} -import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, ListOffsetRequest} +import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse} import scala.math._ @@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String, protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] - protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean - protected def latestEpoch(topicPartition: TopicPartition): Option[Int] protected def logEndOffset(topicPartition: TopicPartition): Long @@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String, info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " + s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset") } catch { - case e: FatalExitError => throw e case e: Throwable => error(s"Error getting offset for partition $topicPartition", e) partitionsWithError += topicPartition @@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String, */ val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition) if (leaderEndOffset < replicaEndOffset) { - // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. - // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, - // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!isUncleanLeaderElectionAllowed(topicPartition)) { - // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly. - fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " + - s"latest offset $leaderEndOffset is less than replica's latest offset $replicaEndOffset}") - throw new FatalExitError - } - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's latest offset $leaderEndOffset") truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset)) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index dc585ebd926..2244771d14c 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String, logAppendInfo } - override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = true - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = { replicaMgr.getReplicaOrException(topicPartition).logStartOffset } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5dcd29b473d..bdbadd9b731 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -21,9 +21,8 @@ import java.util.Optional import kafka.api._ import kafka.cluster.BrokerEndPoint -import kafka.log.{LogAppendInfo, LogConfig} +import kafka.log.LogAppendInfo import kafka.server.AbstractFetcherThread.ResultWithPartitions -import kafka.zk.AdminZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.KafkaStorageException @@ -173,12 +172,6 @@ class ReplicaFetcherThread(name: String, "equal or larger than your settings for max.message.bytes, both at a broker and topic level.") } - override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = { - val adminZkClient = new AdminZkClient(replicaMgr.zkClient) - LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig( - ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable - } - override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = { try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala deleted file mode 100644 index 392c912b25a..00000000000 --- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.util.concurrent.atomic.AtomicBoolean - -import kafka.cluster.BrokerEndPoint -import kafka.utils.{Exit, TestUtils} -import kafka.utils.TestUtils.createBrokerConfigs -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.Records -import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} -import org.apache.kafka.common.utils.Time -import org.junit.{After, Test} - -import scala.collection.Map -import scala.collection.JavaConverters._ -import scala.concurrent.Future - -class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { - - private var brokers: Seq[KafkaServer] = null - @volatile private var shutdownCompleted = false - - @After - override def tearDown() { - Exit.resetExitProcedure() - TestUtils.shutdownServers(brokers) - super.tearDown() - } - - /** - * Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal - * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks - * when the shutdown hook is invoked and hence this test. - */ - @Test - def testFatalErrorInAddPartitions(): Unit = { - - // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before - // the metadata is propagated. - def createTopic(topic: String): Unit = { - adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - } - - val props = createBrokerConfigs(2, zkConnect) - brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params => - import params._ - new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) { - override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError - override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit = - super.addPartitions(partitionAndOffsets.mapValues(_ => -1)) - } - })) - createTopic("topic") - TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete") - } - - /** - * Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a - * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks - * when the shutdown hook is invoked and hence this test. - */ - @Test - def testFatalErrorInProcessFetchRequest(): Unit = { - val props = createBrokerConfigs(2, zkConnect) - brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params => - import params._ - new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) { - override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError - override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = { - fetchRequest.fetchData.asScala.keys.toSeq.map { tp => - (tp, new FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, null)) - } - } - } - })) - TestUtils.createTopic(zkClient, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers) - TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete") - } - - private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint, - replicaManager: ReplicaManager, metrics: Metrics, time: Time, - quotaManager: ReplicationQuotaManager) - - private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = { - val time = Time.SYSTEM - val server = new KafkaServer(config, time) { - - override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { - new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, - quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) { - - override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], - quotaManager: ReplicationQuotaManager) = - new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) { - override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { - val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("") - val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}" - fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics, - time, quotaManager)) - } - } - } - } - - } - - Exit.setExitProcedure { (_, _) => - import scala.concurrent.ExecutionContext.Implicits._ - // Run in a separate thread like shutdown hooks - Future { - server.shutdown() - shutdownCompleted = true - } - // Sleep until interrupted to emulate the fact that `System.exit()` never returns - Thread.sleep(Long.MaxValue) - throw new AssertionError - } - server.startup() - server - } - -} diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 8c1d95a4356..7a7aeb3efb0 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -151,10 +151,10 @@ class AbstractFetcherThreadTest { assertEquals(leaderState.highWatermark, replicaState.highWatermark) } - @Test(expected = classOf[FatalExitError]) - def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit = { + @Test + def testFollowerFetchOutOfRangeHigh(): Unit = { val partition = new TopicPartition("topic", 0) - val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false) + val fetcher = new MockFetcherThread() val replicaLog = Seq( mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)), @@ -185,6 +185,10 @@ class AbstractFetcherThreadTest { leaderState.highWatermark = 0L fetcher.doWork() + + assertEquals(0L, replicaState.logEndOffset) + assertEquals(0L, replicaState.logStartOffset) + assertEquals(0L, replicaState.highWatermark) } @Test @@ -275,9 +279,7 @@ class AbstractFetcherThreadTest { } } - class MockFetcherThread(val replicaId: Int = 0, - val leaderId: Int = 1, - isUncleanLeaderElectionAllowed: Boolean = true) + class MockFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1) extends AbstractFetcherThread("mock-fetcher", clientId = "mock-fetcher", sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt())) { @@ -380,10 +382,6 @@ class AbstractFetcherThreadTest { ResultWithPartitions(Some(fetchRequest), Set.empty) } - override def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = { - isUncleanLeaderElectionAllowed - } - override def latestEpoch(topicPartition: TopicPartition): Option[Int] = { val state = replicaPartitionState(topicPartition) state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))