diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 44137cf35c3..4a2719e36c7 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong import com.yammer.metrics.core.Gauge import kafka.log.LogAppendInfo import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.internals.{FatalExitError, PartitionStates} +import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} -import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, ListOffsetRequest} +import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse} import scala.math._ @@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String, protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] - protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean - protected def latestEpoch(topicPartition: TopicPartition): Option[Int] protected def logEndOffset(topicPartition: TopicPartition): Long @@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String, info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " + s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset") } catch { - case e: FatalExitError => throw e case e: Throwable => error(s"Error getting offset for partition $topicPartition", e) partitionsWithError += topicPartition @@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String, */ val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition) if (leaderEndOffset < replicaEndOffset) { - // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. - // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, - // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!isUncleanLeaderElectionAllowed(topicPartition)) { - // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly. - fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " + - s"latest offset $leaderEndOffset is less than replica's latest offset $replicaEndOffset}") - throw new FatalExitError - } - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's latest offset $leaderEndOffset") truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset)) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index dc585ebd926..2244771d14c 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String, logAppendInfo } - override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = true - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = { replicaMgr.getReplicaOrException(topicPartition).logStartOffset } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5dcd29b473d..bdbadd9b731 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -21,9 +21,8 @@ import java.util.Optional import kafka.api._ import kafka.cluster.BrokerEndPoint -import kafka.log.{LogAppendInfo, LogConfig} +import kafka.log.LogAppendInfo import kafka.server.AbstractFetcherThread.ResultWithPartitions -import kafka.zk.AdminZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.KafkaStorageException @@ -173,12 +172,6 @@ class ReplicaFetcherThread(name: String, "equal or larger than your settings for max.message.bytes, both at a broker and topic level.") } - override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = { - val adminZkClient = new AdminZkClient(replicaMgr.zkClient) - LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig( - ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable - } - override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = { try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala deleted file mode 100644 index 392c912b25a..00000000000 --- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.util.concurrent.atomic.AtomicBoolean - -import kafka.cluster.BrokerEndPoint -import kafka.utils.{Exit, TestUtils} -import kafka.utils.TestUtils.createBrokerConfigs -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.Records -import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} -import org.apache.kafka.common.utils.Time -import org.junit.{After, Test} - -import scala.collection.Map -import scala.collection.JavaConverters._ -import scala.concurrent.Future - -class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { - - private var brokers: Seq[KafkaServer] = null - @volatile private var shutdownCompleted = false - - @After - override def tearDown() { - Exit.resetExitProcedure() - TestUtils.shutdownServers(brokers) - super.tearDown() - } - - /** - * Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal - * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks - * when the shutdown hook is invoked and hence this test. - */ - @Test - def testFatalErrorInAddPartitions(): Unit = { - - // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before - // the metadata is propagated. - def createTopic(topic: String): Unit = { - adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - } - - val props = createBrokerConfigs(2, zkConnect) - brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params => - import params._ - new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) { - override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError - override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit = - super.addPartitions(partitionAndOffsets.mapValues(_ => -1)) - } - })) - createTopic("topic") - TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete") - } - - /** - * Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a - * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks - * when the shutdown hook is invoked and hence this test. - */ - @Test - def testFatalErrorInProcessFetchRequest(): Unit = { - val props = createBrokerConfigs(2, zkConnect) - brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params => - import params._ - new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) { - override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError - override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = { - fetchRequest.fetchData.asScala.keys.toSeq.map { tp => - (tp, new FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, null, null)) - } - } - } - })) - TestUtils.createTopic(zkClient, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers) - TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete") - } - - private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint, - replicaManager: ReplicaManager, metrics: Metrics, time: Time, - quotaManager: ReplicationQuotaManager) - - private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = { - val time = Time.SYSTEM - val server = new KafkaServer(config, time) { - - override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { - new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, - quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) { - - override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], - quotaManager: ReplicationQuotaManager) = - new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) { - override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { - val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("") - val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}" - fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics, - time, quotaManager)) - } - } - } - } - - } - - Exit.setExitProcedure { (_, _) => - import scala.concurrent.ExecutionContext.Implicits._ - // Run in a separate thread like shutdown hooks - Future { - server.shutdown() - shutdownCompleted = true - } - // Sleep until interrupted to emulate the fact that `System.exit()` never returns - Thread.sleep(Long.MaxValue) - throw new AssertionError - } - server.startup() - server - } - -} diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 8c1d95a4356..7a7aeb3efb0 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -151,10 +151,10 @@ class AbstractFetcherThreadTest { assertEquals(leaderState.highWatermark, replicaState.highWatermark) } - @Test(expected = classOf[FatalExitError]) - def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit = { + @Test + def testFollowerFetchOutOfRangeHigh(): Unit = { val partition = new TopicPartition("topic", 0) - val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false) + val fetcher = new MockFetcherThread() val replicaLog = Seq( mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)), @@ -185,6 +185,10 @@ class AbstractFetcherThreadTest { leaderState.highWatermark = 0L fetcher.doWork() + + assertEquals(0L, replicaState.logEndOffset) + assertEquals(0L, replicaState.logStartOffset) + assertEquals(0L, replicaState.highWatermark) } @Test @@ -275,9 +279,7 @@ class AbstractFetcherThreadTest { } } - class MockFetcherThread(val replicaId: Int = 0, - val leaderId: Int = 1, - isUncleanLeaderElectionAllowed: Boolean = true) + class MockFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1) extends AbstractFetcherThread("mock-fetcher", clientId = "mock-fetcher", sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt())) { @@ -380,10 +382,6 @@ class AbstractFetcherThreadTest { ResultWithPartitions(Some(fetchRequest), Set.empty) } - override def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = { - isUncleanLeaderElectionAllowed - } - override def latestEpoch(topicPartition: TopicPartition): Option[Int] = { val state = replicaPartitionState(topicPartition) state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))