Browse Source

KAFKA-7414; Out of range errors should never be fatal for follower (#5654)

This patch fixes the inconsistent handling of out of range errors in the replica fetcher. Previously we would raise a fatal error if the follower's offset is ahead of the leader's and unclean leader election is not enabled. The behavior was inconsistent depending on the message format. With KIP-101/KIP-279 and the new message format, upon becoming a follower, the replica would use leader epoch information to reconcile the end of the log with the leader and simply truncate. Additionally, with the old format, the check is not really bulletproof for detecting data loss since the unclean leader's end offset might have already caught up to the follower's offset at the time of its initial fetch or when it queries for the current log end offset.

With this patch, we simply skip the unclean leader election check and allow the needed truncation to occur. When the truncation offset is below the high watermark, a warning will be logged. This makes the behavior consistent for all message formats and removes a scenario in which an error on one partition can bring the broker down.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
pull/5596/merge
Jason Gustafson 6 years ago committed by GitHub
parent
commit
c5ef614bbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  2. 2
      core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
  3. 9
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  4. 146
      core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
  5. 18
      core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala

17
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import kafka.log.LogAppendInfo import kafka.log.LogAppendInfo
import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.internals.{FatalExitError, PartitionStates} import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, ListOffsetRequest} import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import scala.math._ import scala.math._
@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String,
protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean
protected def latestEpoch(topicPartition: TopicPartition): Option[Int] protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
protected def logEndOffset(topicPartition: TopicPartition): Long protected def logEndOffset(topicPartition: TopicPartition): Long
@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String,
info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " + info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " +
s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset") s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset")
} catch { } catch {
case e: FatalExitError => throw e
case e: Throwable => case e: Throwable =>
error(s"Error getting offset for partition $topicPartition", e) error(s"Error getting offset for partition $topicPartition", e)
partitionsWithError += topicPartition partitionsWithError += topicPartition
@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String,
*/ */
val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition) val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition)
if (leaderEndOffset < replicaEndOffset) { if (leaderEndOffset < replicaEndOffset) {
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
if (!isUncleanLeaderElectionAllowed(topicPartition)) {
// Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
s"latest offset $leaderEndOffset is less than replica's latest offset $replicaEndOffset}")
throw new FatalExitError
}
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's latest offset $leaderEndOffset") s"leader's latest offset $leaderEndOffset")
truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset)) truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset))

2
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala

@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String,
logAppendInfo logAppendInfo
} }
override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = true
override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = { override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = {
replicaMgr.getReplicaOrException(topicPartition).logStartOffset replicaMgr.getReplicaOrException(topicPartition).logStartOffset
} }

9
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -21,9 +21,8 @@ import java.util.Optional
import kafka.api._ import kafka.api._
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.log.{LogAppendInfo, LogConfig} import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.zk.AdminZkClient
import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.errors.KafkaStorageException
@ -173,12 +172,6 @@ class ReplicaFetcherThread(name: String,
"equal or larger than your settings for max.message.bytes, both at a broker and topic level.") "equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
} }
override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = {
val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable
}
override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = { override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
try { try {
val clientResponse = leaderEndpoint.sendRequest(fetchRequest) val clientResponse = leaderEndpoint.sendRequest(fetchRequest)

146
core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala

@ -1,146 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.BrokerEndPoint
import kafka.utils.{Exit, TestUtils}
import kafka.utils.TestUtils.createBrokerConfigs
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.Time
import org.junit.{After, Test}
import scala.collection.Map
import scala.collection.JavaConverters._
import scala.concurrent.Future
class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
private var brokers: Seq[KafkaServer] = null
@volatile private var shutdownCompleted = false
@After
override def tearDown() {
Exit.resetExitProcedure()
TestUtils.shutdownServers(brokers)
super.tearDown()
}
/**
* Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal
* exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
* when the shutdown hook is invoked and hence this test.
*/
@Test
def testFatalErrorInAddPartitions(): Unit = {
// Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
// the metadata is propagated.
def createTopic(topic: String): Unit = {
adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
}
val props = createBrokerConfigs(2, zkConnect)
brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
import params._
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit =
super.addPartitions(partitionAndOffsets.mapValues(_ => -1))
}
}))
createTopic("topic")
TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
}
/**
* Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a
* fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
* when the shutdown hook is invoked and hence this test.
*/
@Test
def testFatalErrorInProcessFetchRequest(): Unit = {
val props = createBrokerConfigs(2, zkConnect)
brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
import params._
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, PD)] = {
fetchRequest.fetchData.asScala.keys.toSeq.map { tp =>
(tp, new FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, null))
}
}
}
}))
TestUtils.createTopic(zkClient, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers)
TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
}
private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint,
replicaManager: ReplicaManager, metrics: Metrics, time: Time,
quotaManager: ReplicationQuotaManager)
private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = {
val time = Time.SYSTEM
val server = new KafkaServer(config, time) {
override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown,
quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
quotaManager: ReplicationQuotaManager) =
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
time, quotaManager))
}
}
}
}
}
Exit.setExitProcedure { (_, _) =>
import scala.concurrent.ExecutionContext.Implicits._
// Run in a separate thread like shutdown hooks
Future {
server.shutdown()
shutdownCompleted = true
}
// Sleep until interrupted to emulate the fact that `System.exit()` never returns
Thread.sleep(Long.MaxValue)
throw new AssertionError
}
server.startup()
server
}
}

18
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala

@ -151,10 +151,10 @@ class AbstractFetcherThreadTest {
assertEquals(leaderState.highWatermark, replicaState.highWatermark) assertEquals(leaderState.highWatermark, replicaState.highWatermark)
} }
@Test(expected = classOf[FatalExitError]) @Test
def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit = { def testFollowerFetchOutOfRangeHigh(): Unit = {
val partition = new TopicPartition("topic", 0) val partition = new TopicPartition("topic", 0)
val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false) val fetcher = new MockFetcherThread()
val replicaLog = Seq( val replicaLog = Seq(
mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)), mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
@ -185,6 +185,10 @@ class AbstractFetcherThreadTest {
leaderState.highWatermark = 0L leaderState.highWatermark = 0L
fetcher.doWork() fetcher.doWork()
assertEquals(0L, replicaState.logEndOffset)
assertEquals(0L, replicaState.logStartOffset)
assertEquals(0L, replicaState.highWatermark)
} }
@Test @Test
@ -275,9 +279,7 @@ class AbstractFetcherThreadTest {
} }
} }
class MockFetcherThread(val replicaId: Int = 0, class MockFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1)
val leaderId: Int = 1,
isUncleanLeaderElectionAllowed: Boolean = true)
extends AbstractFetcherThread("mock-fetcher", extends AbstractFetcherThread("mock-fetcher",
clientId = "mock-fetcher", clientId = "mock-fetcher",
sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt())) { sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt())) {
@ -380,10 +382,6 @@ class AbstractFetcherThreadTest {
ResultWithPartitions(Some(fetchRequest), Set.empty) ResultWithPartitions(Some(fetchRequest), Set.empty)
} }
override def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = {
isUncleanLeaderElectionAllowed
}
override def latestEpoch(topicPartition: TopicPartition): Option[Int] = { override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
val state = replicaPartitionState(topicPartition) val state = replicaPartitionState(topicPartition)
state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH)) state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))

Loading…
Cancel
Save