Browse Source

KAFKA-15481: Fix concurrency bug in RemoteIndexCache (#14483)

RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.

The bug could be reproduced as per the following order of events:-

Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.

This commit fixes the bug by using EvictionListener instead of RemovalListener to perform the eviction atomically with the file rename. It handles the manual removal (not handled by EvictionListener) by using computeIfAbsent() and enforcing atomic cache removal & file rename.

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Arpit Goyal 
<goyal.arpit.91@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
pull/13277/merge
Jotaniya Jeel 11 months ago committed by GitHub
parent
commit
4612fe42af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 139
      core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
  2. 42
      storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java

139
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala

@ -23,7 +23,7 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager} import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{Entry, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName} import org.apache.kafka.storage.internals.log.RemoteIndexCache.{DIR_NAME, Entry, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex} import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.test.{TestUtils => JTestUtils} import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -32,6 +32,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource import org.junit.jupiter.params.provider.EnumSource
import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.any
import org.mockito.invocation.InvocationOnMock
import org.mockito.Mockito._ import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -39,7 +40,7 @@ import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.nio.file.{Files, NoSuchFileException, Paths} import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util import java.util
import java.util.{Collections, Optional} import java.util.{Collections, Optional}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.{CountDownLatch, Executors, Future, TimeUnit}
import scala.collection.mutable import scala.collection.mutable
class RemoteIndexCacheTest { class RemoteIndexCacheTest {
@ -138,8 +139,8 @@ class RemoteIndexCacheTest {
.thenAnswer(ans => { .thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1) val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
val timeIdx = createTimeIndexForSegmentMetadata(metadata) val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
maybeAppendIndexEntries(offsetIdx, timeIdx) maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match { indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file) case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
@ -249,7 +250,7 @@ class RemoteIndexCacheTest {
} }
@Test @Test
def testCacheEntryIsDeletedOnInvalidation(): Unit = { def testCacheEntryIsDeletedOnRemoval(): Unit = {
def getIndexFileFromDisk(suffix: String) = { def getIndexFileFromDisk(suffix: String) = {
Files.walk(tpDir.toPath) Files.walk(tpDir.toPath)
.filter(Files.isRegularFile(_)) .filter(Files.isRegularFile(_))
@ -271,8 +272,8 @@ class RemoteIndexCacheTest {
// no expired entries yet // no expired entries yet
assertEquals(0, cache.expiredIndexes.size, "expiredIndex queue should be zero at start of test") assertEquals(0, cache.expiredIndexes.size, "expiredIndex queue should be zero at start of test")
// invalidate the cache. it should async mark the entry for removal // call remove function to mark the entry for removal
cache.internalCache.invalidate(internalIndexKey) cache.remove(internalIndexKey)
// wait until entry is marked for deletion // wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
@ -672,6 +673,84 @@ class RemoteIndexCacheTest {
verifyFetchIndexInvocation(count = 1) verifyFetchIndexInvocation(count = 1)
} }
@Test
def testConcurrentRemoveReadForCache(): Unit = {
// Create a spy Cache Entry
val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)))
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)))
val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)))
val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex))
cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
assertCacheSize(1)
var entry: RemoteIndexCache.Entry = null
val latchForCacheRead = new CountDownLatch(1)
val latchForCacheRemove = new CountDownLatch(1)
val latchForTestWait = new CountDownLatch(1)
var markForCleanupCallCount = 0
doAnswer((invocation: InvocationOnMock) => {
markForCleanupCallCount += 1
if (markForCleanupCallCount == 1) {
// Signal the CacheRead to unblock itself
latchForCacheRead.countDown()
// Wait for signal to start renaming the files
latchForCacheRemove.await()
// Calling the markForCleanup() actual method to start renaming the files
invocation.callRealMethod()
// Signal TestWait to unblock itself so that test can be completed
latchForTestWait.countDown()
}
}).when(spyEntry).markForCleanup()
val removeCache = (() => {
cache.remove(rlsMetadata.remoteLogSegmentId().id())
}): Runnable
val readCache = (() => {
// Wait for signal to start CacheRead
latchForCacheRead.await()
entry = cache.getIndexEntry(rlsMetadata)
// Signal the CacheRemove to start renaming the files
latchForCacheRemove.countDown()
}): Runnable
val executor = Executors.newFixedThreadPool(2)
try {
val removeCacheFuture: Future[_] = executor.submit(removeCache: Runnable)
val readCacheFuture: Future[_] = executor.submit(readCache: Runnable)
// Verify both tasks are completed without any exception
removeCacheFuture.get()
readCacheFuture.get()
// Wait for signal to complete the test
latchForTestWait.await()
// We can't determine read thread or remove thread will go first so if,
// 1. Read thread go first, cache file should not exist and cache size should be zero.
// 2. Remove thread go first, cache file should present and cache size should be one.
// so basically here we are making sure that if cache existed, the cache file should exist,
// and if cache is non-existed, the cache file should not exist.
if (getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent) {
assertCacheSize(1)
} else {
assertCacheSize(0)
}
} finally {
executor.shutdownNow()
}
}
@Test @Test
def testMultipleIndexEntriesExecutionInCorruptException(): Unit = { def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
reset(rsm) reset(rsm)
@ -679,9 +758,9 @@ class RemoteIndexCacheTest {
.thenAnswer(ans => { .thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1) val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
val timeIdx = createTimeIndexForSegmentMetadata(metadata) val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
val txnIdx = createTxIndexForSegmentMetadata(metadata) val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir)
maybeAppendIndexEntries(offsetIdx, timeIdx) maybeAppendIndexEntries(offsetIdx, timeIdx)
// Create corrupted index file // Create corrupted index file
createCorruptTimeIndexOffsetFile(tpDir) createCorruptTimeIndexOffsetFile(tpDir)
@ -717,9 +796,9 @@ class RemoteIndexCacheTest {
.thenAnswer(ans => { .thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1) val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
val timeIdx = createTimeIndexForSegmentMetadata(metadata) val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
val txnIdx = createTxIndexForSegmentMetadata(metadata) val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir)
maybeAppendIndexEntries(offsetIdx, timeIdx) maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match { indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file) case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
@ -764,7 +843,7 @@ class RemoteIndexCacheTest {
Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix))) Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix)))
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix))) Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix)))
cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) cache.remove(rlsMetadata.remoteLogSegmentId().id())
// wait until entry is marked for deletion // wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
@ -792,9 +871,9 @@ class RemoteIndexCacheTest {
.thenAnswer(ans => { .thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1) val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
val timeIdx = createTimeIndexForSegmentMetadata(metadata) val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
val txnIdx = createTxIndexForSegmentMetadata(metadata) val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir)
maybeAppendIndexEntries(offsetIdx, timeIdx) maybeAppendIndexEntries(offsetIdx, timeIdx)
// Create corrupt index file return from RSM // Create corrupt index file return from RSM
createCorruptedIndexFile(testIndexType, tpDir) createCorruptedIndexFile(testIndexType, tpDir)
@ -839,7 +918,7 @@ class RemoteIndexCacheTest {
// verify deleted file exists on disk // verify deleted file exists on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) cache.remove(rlsMetadata.remoteLogSegmentId().id())
// wait until entry is marked for deletion // wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
@ -862,9 +941,9 @@ class RemoteIndexCacheTest {
= RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = { = RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata)) val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, tpDir))
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata)) val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, tpDir))
val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata)) val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, tpDir))
spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex))
} }
@ -892,8 +971,8 @@ class RemoteIndexCacheTest {
} }
} }
private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TransactionIndex = { private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File): TransactionIndex = {
val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata) val txnIdxFile = remoteTransactionIndexFile(dir, metadata)
txnIdxFile.createNewFile() txnIdxFile.createNewFile()
new TransactionIndex(metadata.startOffset(), txnIdxFile) new TransactionIndex(metadata.startOffset(), txnIdxFile)
} }
@ -914,14 +993,14 @@ class RemoteIndexCacheTest {
return new TransactionIndex(100L, txnIdxFile) return new TransactionIndex(100L, txnIdxFile)
} }
private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = { private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File): TimeIndex = {
val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12) new TimeIndex(remoteTimeIndexFile(dir, metadata), metadata.startOffset(), maxEntries * 12)
} }
private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata) = { private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File) = {
val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
new OffsetIndex(remoteOffsetIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 8) new OffsetIndex(remoteOffsetIndexFile(dir, metadata), metadata.startOffset(), maxEntries * 8)
} }
private def generateRemoteLogSegmentMetadata(size: Int, private def generateRemoteLogSegmentMetadata(size: Int,
@ -969,9 +1048,9 @@ class RemoteIndexCacheTest {
.thenAnswer(ans => { .thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1) val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir)
val timeIdx = createTimeIndexForSegmentMetadata(metadata) val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir)
val txnIdx = createTxIndexForSegmentMetadata(metadata) val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir)
maybeAppendIndexEntries(offsetIdx, timeIdx) maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match { indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file) case IndexType.OFFSET => new FileInputStream(offsetIdx.file)

42
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java

@ -151,19 +151,16 @@ public class RemoteIndexCache implements Closeable {
.weigher((Uuid key, Entry entry) -> { .weigher((Uuid key, Entry entry) -> {
return (int) entry.entrySizeBytes; return (int) entry.entrySizeBytes;
}) })
// removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or // This listener is invoked each time an entry is being automatically removed due to eviction. The cache will invoke this listener
// evicted (means removal due to the policy) // during the atomic operation to remove the entry (refer: https://github.com/ben-manes/caffeine/wiki/Removal),
.removalListener((Uuid key, Entry entry, RemovalCause cause) -> { // hence, care must be taken to ensure that this operation is not expensive. Note that this listener is not invoked when
// RemovalCause from cache is EXPLICIT or REPLACED (e.g. on Cache.invalidate(), Cache.put() etc.) For a complete list see:
// https://github.com/ben-manes/caffeine/blob/0cef55168986e3816314e7fdba64cb0b996dd3cc/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java#L23
// Hence, any operation required after removal from cache must be performed manually for these scenarios.
.evictionListener((Uuid key, Entry entry, RemovalCause cause) -> {
// Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread. // Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread.
if (entry != null) { if (entry != null) {
try { enqueueEntryForCleanup(entry, key);
entry.markForCleanup();
} catch (IOException e) {
throw new KafkaException(e);
}
if (!expiredIndexes.offer(entry)) {
log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key);
}
} else { } else {
log.error("Received entry as null for key {} when the it is removed from the cache.", key); log.error("Received entry as null for key {} when the it is removed from the cache.", key);
} }
@ -187,7 +184,11 @@ public class RemoteIndexCache implements Closeable {
public void remove(Uuid key) { public void remove(Uuid key) {
lock.readLock().lock(); lock.readLock().lock();
try { try {
internalCache.invalidate(key); internalCache.asMap().computeIfPresent(key, (k, v) -> {
enqueueEntryForCleanup(v, k);
// Returning null to remove the key from the cache
return null;
});
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
@ -196,12 +197,27 @@ public class RemoteIndexCache implements Closeable {
public void removeAll(Collection<Uuid> keys) { public void removeAll(Collection<Uuid> keys) {
lock.readLock().lock(); lock.readLock().lock();
try { try {
internalCache.invalidateAll(keys); keys.forEach(key -> internalCache.asMap().computeIfPresent(key, (k, v) -> {
enqueueEntryForCleanup(v, k);
// Returning null to remove the key from the cache
return null;
}));
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
} }
private void enqueueEntryForCleanup(Entry entry, Uuid key) {
try {
entry.markForCleanup();
if (!expiredIndexes.offer(entry)) {
log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key);
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
// Visible for testing // Visible for testing
public ShutdownableThread cleanerThread() { public ShutdownableThread cleanerThread() {
return cleanerThread; return cleanerThread;

Loading…
Cancel
Save