Browse Source

KAFKA-15671: Fix flaky test RemoteIndexCacheTest.testClearCacheAndIndexFilesWhenResizeCache (#14622)

Reviewers: Divij Vaidya <diviv@amazon.com>

---------

Co-authored-by: Deqi Hu <deqi.hu@shopee.com>
pull/14605/merge
hudeqi 11 months ago committed by GitHub
parent
commit
b559942c17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala

48
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala

@ -36,7 +36,7 @@ import org.mockito.invocation.InvocationOnMock @@ -36,7 +36,7 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}
import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.io.{File, FileInputStream, IOException, PrintWriter, UncheckedIOException}
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util
import java.util.{Collections, Optional}
@ -292,13 +292,13 @@ class RemoteIndexCacheTest { @@ -292,13 +292,13 @@ class RemoteIndexCacheTest {
verify(cacheEntry.txnIndex).renameTo(any(classOf[File]))
// verify no index files on disk
assertFalse(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${tpDir.toPath}")
assertFalse(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${tpDir.toPath}")
assertFalse(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${tpDir.toPath}")
assertFalse(getIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${tpDir.toPath}")
}
@ -822,13 +822,6 @@ class RemoteIndexCacheTest { @@ -822,13 +822,6 @@ class RemoteIndexCacheTest {
val remoteIndexCacheDir = cache.cacheDir()
val tempSuffix = ".tmptest"
def getRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}
def renameRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
@ -859,9 +852,9 @@ class RemoteIndexCacheTest { @@ -859,9 +852,9 @@ class RemoteIndexCacheTest {
// Index Files already exist ,rsm should not fetch them again.
verifyFetchIndexInvocation(count = 1)
// verify index files on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
}
@ParameterizedTest
@ -892,18 +885,11 @@ class RemoteIndexCacheTest { @@ -892,18 +885,11 @@ class RemoteIndexCacheTest {
def testConcurrentCacheDeletedFileExists(): Unit = {
val remoteIndexCacheDir = cache.cacheDir()
def getRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}
val entry = cache.getIndexEntry(rlsMetadata)
// verify index files on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
// Simulating a concurrency issue where deleted files already exist on disk
// This happen when cleanerThread is slow and not able to delete index entries
@ -916,7 +902,7 @@ class RemoteIndexCacheTest { @@ -916,7 +902,7 @@ class RemoteIndexCacheTest {
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
// verify deleted file exists on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
cache.remove(rlsMetadata.remoteLogSegmentId().id())
@ -927,13 +913,13 @@ class RemoteIndexCacheTest { @@ -927,13 +913,13 @@ class RemoteIndexCacheTest {
"Failed to cleanup cache entry after invalidation")
// verify no index files on disk
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}")
}
@ -1095,7 +1081,7 @@ class RemoteIndexCacheTest { @@ -1095,7 +1081,7 @@ class RemoteIndexCacheTest {
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
} catch {
case _: NoSuchFileException => Optional.empty()
case e @ (_ : NoSuchFileException | _ : UncheckedIOException) => Optional.empty()
}
}
}

Loading…
Cancel
Save