Browse Source

KAFKA-15536: Dynamically resize remoteIndexCache (#14511)

Dynamically resize remoteIndexCache

Reviewers: Christo Lolov <lolovc@amazon.com>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
pull/14553/head
hudeqi 1 year ago committed by GitHub
parent
commit
b0b8693c72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 133
      core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
  2. 17
      storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java

133
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala

@ -23,7 +23,7 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} @@ -23,7 +23,7 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{Entry, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
@ -35,8 +35,8 @@ import org.mockito.ArgumentMatchers.any @@ -35,8 +35,8 @@ import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}
import java.io.{File, FileInputStream, FileNotFoundException, IOException, PrintWriter}
import java.nio.file.{Files, Paths}
import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util
import java.util.{Collections, Optional}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
@ -517,29 +517,80 @@ class RemoteIndexCacheTest { @@ -517,29 +517,80 @@ class RemoteIndexCacheTest {
@Test
def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
def getIndexFileFromRemoteCacheDir(suffix: String) = {
try {
Files.walk(cache.cacheDir().toPath())
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
} catch {
case _: FileNotFoundException => Optional.empty()
}
assertCacheSize(0)
// getIndex for first time will call rsm#fetchIndex
val cacheEntry = cache.getIndexEntry(metadataList.head)
assertCacheSize(1)
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
cache.resizeCacheSize(1L)
// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after resizing cache.")
TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
"Failed to cleanup cache entry after resizing cache.")
// verify no index files on remote cache dir
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}")
assertCacheSize(0)
}
@Test
def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, entryToVerify: Entry): Unit = {
// wait until `entryToVerify` is marked for deletion
TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
"Failed to mark evicted cache entry for cleanup after resizing cache.")
TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
"Failed to cleanup evicted cache entry after resizing cache.")
// verify no index files for `entryToVerify` on remote cache dir
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent,
s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent,
s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent,
s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent,
s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}")
}
def verifyEntryIsKept(metadataToVerify: RemoteLogSegmentMetadata): Unit = {
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent)
assertTrue(!getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent)
}
// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries ->
// evict to 1 entry -> resize to 1 entry size -> resize to 2 entries size
val estimateEntryBytesSize = estimateOneEntryBytesSize()
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
assertCacheSize(0)
// getIndex for first time will call rsm#fetchIndex
val cacheEntry = cache.getIndexEntry(metadataList.head)
assertCacheSize(1)
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it.
cache.resizeCacheSize(1L)
// wait until entry is marked for deletion
@ -549,16 +600,45 @@ class RemoteIndexCacheTest { @@ -549,16 +600,45 @@ class RemoteIndexCacheTest {
"Failed to cleanup cache entry after resizing cache.")
// verify no index files on remote cache dir
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}")
assertTrue(cache.internalCache().estimatedSize() == 0)
assertCacheSize(0)
// Increase cache capacity to only store 2 entries
cache.resizeCacheSize(2 * estimateEntryBytesSize)
assertCacheSize(0)
val entry0 = cache.getIndexEntry(metadataList(0))
val entry1 = cache.getIndexEntry(metadataList(1))
cache.getIndexEntry(metadataList(2))
assertCacheSize(2)
verifyEntryIsEvicted(metadataList(0), entry0)
// Reduce cache capacity to only store 1 entries
cache.resizeCacheSize(1 * estimateEntryBytesSize)
assertCacheSize(1)
verifyEntryIsEvicted(metadataList(1), entry1)
// resize to the same size, all entries should be kept
cache.resizeCacheSize(1 * estimateEntryBytesSize)
// verify all existing entries (`cache.getIndexEntry(metadataList(2))`) are kept
verifyEntryIsKept(metadataList(2))
assertCacheSize(1)
// increase the size
cache.resizeCacheSize(2 * estimateEntryBytesSize)
// verify all existing entries (`cache.getIndexEntry(metadataList(2))`) are kept
verifyEntryIsKept(metadataList(2))
assertCacheSize(1)
}
@ParameterizedTest
@ -928,4 +1008,15 @@ class RemoteIndexCacheTest { @@ -928,4 +1008,15 @@ class RemoteIndexCacheTest {
createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
}
}
private def getIndexFileFromRemoteCacheDir(cache: RemoteIndexCache, suffix: String) = {
try {
Files.walk(cache.cacheDir().toPath())
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
} catch {
case _: NoSuchFileException => Optional.empty()
}
}
}

17
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java

@ -47,6 +47,7 @@ import java.util.Collection; @@ -47,6 +47,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -137,15 +138,8 @@ public class RemoteIndexCache implements Closeable { @@ -137,15 +138,8 @@ public class RemoteIndexCache implements Closeable {
public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
lock.writeLock().lock();
try {
// When resizing the cache, we always start with an empty cache. There are two main reasons:
// 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old
// cache to the new cache in time when resizing inside.
// 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry
// in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches
// will be inconsistent.
internalCache.invalidateAll();
log.info("Invalidated all entries in the cache and triggered the cleaning of all index files in the cache dir.");
internalCache = initEmptyCache(remoteLogIndexFileCacheSize);
internalCache.policy().eviction().orElseThrow(() -> new NoSuchElementException("No eviction policy is set for the remote index cache.")
).setMaximum(remoteLogIndexFileCacheSize);
} finally {
lock.writeLock().unlock();
}
@ -716,4 +710,9 @@ public class RemoteIndexCache implements Closeable { @@ -716,4 +710,9 @@ public class RemoteIndexCache implements Closeable {
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX;
}
// Visible for testing
public static String remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.DELETED_FILE_SUFFIX;
}
}
Loading…
Cancel
Save