From b559942c172558efaf07427c80b1f3276b60a469 Mon Sep 17 00:00:00 2001 From: hudeqi <1217150961@qq.com> Date: Wed, 25 Oct 2023 17:18:55 +0800 Subject: [PATCH] KAFKA-15671: Fix flaky test RemoteIndexCacheTest.testClearCacheAndIndexFilesWhenResizeCache (#14622) Reviewers: Divij Vaidya --------- Co-authored-by: Deqi Hu --- .../log/remote/RemoteIndexCacheTest.scala | 48 +++++++------------ 1 file changed, 17 insertions(+), 31 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index 3a3b4d2c91c..48b01503bb6 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -36,7 +36,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.Mockito._ import org.slf4j.{Logger, LoggerFactory} -import java.io.{File, FileInputStream, IOException, PrintWriter} +import java.io.{File, FileInputStream, IOException, PrintWriter, UncheckedIOException} import java.nio.file.{Files, NoSuchFileException, Paths} import java.util import java.util.{Collections, Optional} @@ -292,13 +292,13 @@ class RemoteIndexCacheTest { verify(cacheEntry.txnIndex).renameTo(any(classOf[File])) // verify no index files on disk - assertFalse(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Index file marked for deletion should not be present on disk at ${tpDir.toPath}") } @@ -822,13 +822,6 @@ class RemoteIndexCacheTest { val remoteIndexCacheDir = cache.cacheDir() val tempSuffix = ".tmptest" - def getRemoteCacheIndexFileFromDisk(suffix: String) = { - Files.walk(remoteIndexCacheDir.toPath) - .filter(Files.isRegularFile(_)) - .filter(path => path.getFileName.toString.endsWith(suffix)) - .findAny() - } - def renameRemoteCacheIndexFileFromDisk(suffix: String) = { Files.walk(remoteIndexCacheDir.toPath) .filter(Files.isRegularFile(_)) @@ -859,9 +852,9 @@ class RemoteIndexCacheTest { // Index Files already exist ,rsm should not fetch them again. verifyFetchIndexInvocation(count = 1) // verify index files on disk - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") } @ParameterizedTest @@ -892,18 +885,11 @@ class RemoteIndexCacheTest { def testConcurrentCacheDeletedFileExists(): Unit = { val remoteIndexCacheDir = cache.cacheDir() - def getRemoteCacheIndexFileFromDisk(suffix: String) = { - Files.walk(remoteIndexCacheDir.toPath) - .filter(Files.isRegularFile(_)) - .filter(path => path.getFileName.toString.endsWith(suffix)) - .findAny() - } - val entry = cache.getIndexEntry(rlsMetadata) // verify index files on disk - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") // Simulating a concurrency issue where deleted files already exist on disk // This happen when cleanerThread is slow and not able to delete index entries @@ -916,7 +902,7 @@ class RemoteIndexCacheTest { Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX))) // verify deleted file exists on disk - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") cache.remove(rlsMetadata.remoteLogSegmentId().id()) @@ -927,13 +913,13 @@ class RemoteIndexCacheTest { "Failed to cleanup cache entry after invalidation") // verify no index files on disk - waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}") - waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}") - waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}") - waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}") } @@ -1095,7 +1081,7 @@ class RemoteIndexCacheTest { .filter(path => path.getFileName.toString.endsWith(suffix)) .findAny() } catch { - case _: NoSuchFileException => Optional.empty() + case e @ (_ : NoSuchFileException | _ : UncheckedIOException) => Optional.empty() } } }