Browse Source

KAFKA-8398: Prevent NPE in `forceUnmap` (#8983)

Without this change, we would catch the NPE and log it.
This was misleading and could cause excessive log
volume.

The NPE can happen after `AlterReplicaLogDirs` completes
successfully and when unmapping older regions. Example
stacktrace:

```text
[2019-05-20 14:08:13,999] ERROR Error unmapping index /tmp/kafka-logs/test-0.567a0d8ff88b45ab95794020d0b2e66f-delete/00000000000000000000.index (kafka.log.OffsetIndex)
java.lang.NullPointerException
at org.apache.kafka.common.utils.MappedByteBuffers.unmap(MappedByteBuffers.java:73)
at kafka.log.AbstractIndex.forceUnmap(AbstractIndex.scala:318)
at kafka.log.AbstractIndex.safeForceUnmap(AbstractIndex.scala:308)
at kafka.log.AbstractIndex.$anonfun$closeHandler$1(AbstractIndex.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.log.AbstractIndex.closeHandler(AbstractIndex.scala:257)
at kafka.log.AbstractIndex.deleteIfExists(AbstractIndex.scala:226)
at kafka.log.LogSegment.$anonfun$deleteIfExists$6(LogSegment.scala:597)
at kafka.log.LogSegment.delete$1(LogSegment.scala:585)
at kafka.log.LogSegment.$anonfun$deleteIfExists$5(LogSegment.scala:597)
at kafka.utils.CoreUtils$.$anonfun$tryAll$1(CoreUtils.scala:115)
at kafka.utils.CoreUtils$.$anonfun$tryAll$1$adapted(CoreUtils.scala:114)
at scala.collection.immutable.List.foreach(List.scala:392)
at kafka.utils.CoreUtils$.tryAll(CoreUtils.scala:114)
at kafka.log.LogSegment.deleteIfExists(LogSegment.scala:599)
at kafka.log.Log.$anonfun$delete$3(Log.scala:1762)
at kafka.log.Log.$anonfun$delete$3$adapted(Log.scala:1762)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at kafka.log.Log.$anonfun$delete$2(Log.scala:1762)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.log.Log.maybeHandleIOException(Log.scala:2013)
at kafka.log.Log.delete(Log.scala:1759)
at kafka.log.LogManager.deleteLogs(LogManager.scala:761)
at kafka.log.LogManager.$anonfun$deleteLogs$6(LogManager.scala:775)
at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Jaikiran Pai <jaikiran.pai@gmail.com>
pull/8974/head
Ismael Juma 4 years ago committed by GitHub
parent
commit
8a24da376b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      core/src/main/scala/kafka/log/AbstractIndex.scala

8
core/src/main/scala/kafka/log/AbstractIndex.scala

@ -311,9 +311,11 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: @@ -311,9 +311,11 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset:
}
protected def safeForceUnmap(): Unit = {
try forceUnmap()
catch {
case t: Throwable => error(s"Error unmapping index $file", t)
if (mmap != null) {
try forceUnmap()
catch {
case t: Throwable => error(s"Error unmapping index $file", t)
}
}
}

Loading…
Cancel
Save