Browse Source

KAFKA-15511: Handle CorruptIndexException in RemoteIndexCache (#14459)

A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
pull/8860/merge
iit2009060 1 year ago committed by GitHub
parent
commit
13b119aa62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
  2. 3
      storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java

28
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala

@ -32,7 +32,7 @@ import org.mockito.ArgumentMatchers.any @@ -32,7 +32,7 @@ import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}
import java.io.{File, FileInputStream, IOException}
import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.nio.file.Files
import java.util
import java.util.Collections
@ -524,6 +524,23 @@ class RemoteIndexCacheTest { @@ -524,6 +524,23 @@ class RemoteIndexCacheTest {
}
}
@Test
def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
// create Corrupt Offset Index File
createCorruptRemoteIndexCacheOffsetFile()
val entry = cache.getIndexEntry(rlsMetadata)
// Test would fail if it throws corrupt Exception
val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
val offsetIndexFile = entry.offsetIndex.file().toPath
assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString)
// assert that parent directory for the index files is correct
assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
// file is corrupted it should fetch from remote storage again
verifyFetchIndexInvocation(count = 1)
}
private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
= RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
@ -598,4 +615,13 @@ class RemoteIndexCacheTest { @@ -598,4 +615,13 @@ class RemoteIndexCacheTest {
timeIndex.flush()
}
}
private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
pw.write("Hello, world")
// The size of the string written in the file is 12 bytes,
// but it should be multiple of Offset Index EntrySIZE which is equal to 8.
pw.close()
}
}

3
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java

@ -21,7 +21,6 @@ import com.github.benmanes.caffeine.cache.Caffeine; @@ -21,7 +21,6 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@ -310,7 +309,7 @@ public class RemoteIndexCache implements Closeable { @@ -310,7 +309,7 @@ public class RemoteIndexCache implements Closeable {
if (Files.exists(indexFile.toPath())) {
try {
index = readIndex.apply(indexFile);
} catch (CorruptRecordException ex) {
} catch (CorruptIndexException ex) {
log.info("Error occurred while loading the stored index file {}", indexFile.getPath(), ex);
}
}

Loading…
Cancel
Save