Browse Source

KAFKA-15176: add tests for tiered storage metrics (#13999)

Added tests for metrics:
1. RemoteLogReaderTaskQueueSize
2. RemoteLogReaderAvgIdlePercent
3. RemoteLogManagerTasksAvgIdlePercent

Also, added tests for OffsetOutOfRangeException will be thrown while reading logs

Reviewers: Christo Lolov <christololov@gmail.com>, Satish Duggana <satishd@apache.org>
pull/14065/head
Luke Chen 1 year ago committed by GitHub
parent
commit
27ea025e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      checkstyle/suppressions.xml
  2. 80
      core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
  3. 277
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  4. 14
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  5. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java

2
checkstyle/suppressions.xml

@ -39,7 +39,7 @@ @@ -39,7 +39,7 @@
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>

80
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package kafka.log.remote;
import com.yammer.metrics.core.Gauge;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
@ -49,6 +50,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException; @@ -49,6 +50,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
@ -87,6 +89,7 @@ import java.util.Optional; @@ -87,6 +89,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT;
@ -102,6 +105,7 @@ import static org.mockito.ArgumentMatchers.anyInt; @@ -102,6 +105,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
@ -366,7 +370,83 @@ public class RemoteLogManagerTest { @@ -366,7 +370,83 @@ public class RemoteLogManagerTest {
assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
assertEquals(10, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
}
@Test
void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
File tempFile = TestUtils.tempFile();
File mockProducerSnapshotIndex = TestUtils.tempFile();
File tempDir = TestUtils.tempDirectory();
// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
FileRecords fileRecords = mock(FileRecords.class);
when(oldSegment.log()).thenReturn(fileRecords);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(250L);
LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000);
LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500);
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
when(oldSegment.txnIndex()).thenReturn(txnIndex);
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(ans -> {
// waiting for verification
latch.await();
return null;
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
Partition mockFollowerPartition = mockPartition(followerTopicIdPartition);
// before running tasks, the remote log manager tasks should be all idle
assertEquals(1.0, yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), topicIds);
assertTrue(yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0);
// unlock copyLogSegmentData
latch.countDown();
}
private double yammerMetricValue(String name) {
Gauge<Double> guage = (Gauge) KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
.filter(e -> e.getKey().getMBeanName().contains(name))
.findFirst()
.get()
.getValue();
return guage.value();
}
@Test

277
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -32,7 +32,7 @@ import kafka.log._ @@ -32,7 +32,7 @@ import kafka.log._
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
import kafka.server.epoch.util.MockBlockingSender
import kafka.utils.{Pool, TestUtils}
import kafka.utils.{Pool, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData
@ -60,22 +60,24 @@ import org.apache.kafka.server.common.OffsetAndEpoch @@ -60,22 +60,24 @@ import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import com.yammer.metrics.core.Gauge
import kafka.log.remote.RemoteLogManager
import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction}
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.util.timer.MockTimer
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyMap, anySet, anyString}
import org.mockito.Mockito.{doReturn, mock, mockConstruction, never, reset, spy, times, verify, verifyNoMoreInteractions, when}
import org.mockito.ArgumentMatchers.{any, anyInt, anyLong, anyMap, anySet, anyString}
import org.mockito.Mockito.{doAnswer, doReturn, mock, mockConstruction, never, reset, spy, times, verify, verifyNoMoreInteractions, when}
import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters.RichOptionForJava8
@ -89,8 +91,10 @@ class ReplicaManagerTest { @@ -89,8 +91,10 @@ class ReplicaManagerTest {
private val topicNames = scala.Predef.Map(topicId -> "test-topic")
private val transactionalId = "txn"
private val time = new MockTime
private val scheduler = new MockScheduler(time)
private val metrics = new Metrics
private val startOffset = 0
private val endOffset = 20
private val highHW = 18
private var alterPartitionManager: AlterPartitionManager = _
private var config: KafkaConfig = _
private var quotaManager: QuotaManagers = _
@ -2894,14 +2898,19 @@ class ReplicaManagerTest { @@ -2894,14 +2898,19 @@ class ReplicaManagerTest {
mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None,
mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None,
isShuttingDown: AtomicBoolean = new AtomicBoolean(false),
enableRemoteStorage: Boolean = false
enableRemoteStorage: Boolean = false,
shouldMockLog: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None
): ReplicaManager = {
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
props.put("log.dirs", path1 + "," + path2)
propsModifier.apply(props)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps))
val mockLog = setupMockLog(path1)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None)
val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId))
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
@ -2918,6 +2927,8 @@ class ReplicaManagerTest { @@ -2918,6 +2927,8 @@ class ReplicaManagerTest {
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "DelayedRemoteFetch", timer, reaperEnabled = false)
// Set up transactions
val metadataResponseTopic = Seq(new MetadataResponseTopic()
@ -2935,7 +2946,7 @@ class ReplicaManagerTest { @@ -2935,7 +2946,7 @@ class ReplicaManagerTest {
metrics = metrics,
config = config,
time = time,
scheduler = scheduler,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = metadataCache,
@ -2946,9 +2957,15 @@ class ReplicaManagerTest { @@ -2946,9 +2957,15 @@ class ReplicaManagerTest {
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
threadNamePrefix = Option(this.getClass.getName),
remoteLogManager = if (enableRemoteStorage) Some(mockRemoteLogManager) else None,
addPartitionsToTxnManager = Some(addPartitionsToTxnManager)) {
addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
remoteLogManager = if (enableRemoteStorage) {
if (remoteLogManager.isDefined)
remoteLogManager
else
Some(mockRemoteLogManager)
} else None) {
override protected def createReplicaFetcherManager(
metrics: Metrics,
@ -3379,6 +3396,244 @@ class ReplicaManagerTest { @@ -3379,6 +3396,244 @@ class ReplicaManagerTest {
testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, Errors.NONE)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(booleans = Array(true, false))
def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): Unit = {
val replicaId = if (isFromFollower) 1 else -1
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
// create a replicaManager with remoteLog enabled
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderEpoch = 0
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(leaderEpoch)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 1000, 0, 100, FetchIsolation.LOG_END, None.asJava)
// when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately
val result = replicaManager.readFromLog(params, Seq(tidp0 -> new PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, false)
if (isFromFollower) {
// expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log
assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, result.head._2.error)
} else {
assertEquals(Errors.NONE, result.head._2.error)
}
assertEquals(startOffset, result.head._2.leaderLogStartOffset)
assertEquals(endOffset, result.head._2.leaderLogEndOffset)
assertEquals(highHW, result.head._2.highWatermark)
if (isFromFollower) {
assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent)
} else {
// for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch
assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent)
}
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(booleans = Array(true, false))
def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): Unit = {
val replicaId = if (isFromFollower) 1 else -1
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
// create a replicaManager with remoteLog enabled
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog= true)
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderEpoch = 0
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(leaderEpoch)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
val fetchOffset = 1
def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
assertEquals(1, responseStatus.size)
assertEquals(tidp0, responseStatus.toMap.keySet.head)
val fetchPartitionData: FetchPartitionData = responseStatus.toMap.get(tidp0).get
// should only follower fetch enter callback since consumer fetch will enter remoteFetch purgatory
assertTrue(isFromFollower)
assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, fetchPartitionData.error)
assertEquals(startOffset, fetchPartitionData.logStartOffset)
assertEquals(highHW, fetchPartitionData.highWatermark)
}
// when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately
replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, fetchCallback)
val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
if (isFromFollower) {
verify(mockRemoteLogManager, never()).asyncRead(remoteStorageFetchInfoArg.capture(), any())
} else {
verify(mockRemoteLogManager).asyncRead(remoteStorageFetchInfoArg.capture(), any())
val remoteStorageFetchInfo = remoteStorageFetchInfoArg.getValue
assertEquals(tp0, remoteStorageFetchInfo.topicPartition)
assertEquals(fetchOffset, remoteStorageFetchInfo.fetchInfo.fetchOffset)
assertEquals(topicId, remoteStorageFetchInfo.fetchInfo.topicId)
assertEquals(startOffset, remoteStorageFetchInfo.fetchInfo.logStartOffset)
assertEquals(leaderEpoch, remoteStorageFetchInfo.fetchInfo.currentLeaderEpoch.get())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
@Test
def testRemoteLogReaderMetrics(): Unit = {
val replicaId = -1
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)
val props = new Properties()
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
// set log reader threads number to 2
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2)
val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
val mockLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats
val remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
0,
TestUtils.tempRelativeDir("data").getAbsolutePath,
"clusterId",
time,
_ => Optional.of(mockLog),
brokerTopicStats)
val spyRLM = spy(remoteLogManager)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM))
try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderEpoch = 0
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(
new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(leaderEpoch)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
val fetchOffset = 1
def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
assertEquals(1, responseStatus.size)
assertEquals(tidp0, responseStatus.toMap.keySet.head)
}
assertEquals(1.0, yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double])
assertEquals(0, yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
// our thread number is 2
val queueLatch = new CountDownLatch(2)
val doneLatch = new CountDownLatch(1)
doAnswer(_ => {
queueLatch.countDown()
// wait until verification completed
doneLatch.await()
new FetchDataInfo(new LogOffsetMetadata(startOffset), mock(classOf[Records]))
}).when(spyRLM).read(any())
// create 5 asyncRead tasks, which should enqueue 3 task
for (i <- 1 to 5)
replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, fetchCallback)
// wait until at least 2 task submitted to use all the available threads
queueLatch.await()
// RemoteLogReader should not be all idle
assertTrue(yammerMetricValue("RemoteLogReaderAvgIdlePercent").asInstanceOf[Double] < 1.0)
// RemoteLogReader should queue some tasks
assertEquals(3, yammerMetricValue("RemoteLogReaderTaskQueueSize").asInstanceOf[Int])
// unlock all tasks
doneLatch.countDown()
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
private def yammerMetricValue(name: String): Any = {
val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }
.getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
metric match {
case m: Gauge[_] => m.value
case m => fail(s"Unexpected broker metric of class ${m.getClass}")
}
}
private def setupMockLog(path: String): UnifiedLog = {
val mockLog = mock(classOf[UnifiedLog])
when(mockLog.parentDir).thenReturn(path)
when(mockLog.topicId).thenReturn(Some(topicId))
when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 0))
when(mockLog.highWatermark).thenReturn(highHW)
when(mockLog.updateHighWatermark(anyLong())).thenReturn(0L)
when(mockLog.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(10))
when(mockLog.maybeIncrementHighWatermark(any(classOf[LogOffsetMetadata]))).thenReturn(None)
when(mockLog.endOffsetForEpoch(anyInt())).thenReturn(None)
// try to return a high start offset to cause OffsetOutOfRangeException at the 1st time
when(mockLog.logStartOffset).thenReturn(endOffset).thenReturn(startOffset)
when(mockLog.logEndOffset).thenReturn(endOffset)
when(mockLog.localLogStartOffset()).thenReturn(endOffset - 10)
when(mockLog.remoteLogEnabled()).thenReturn(true)
mockLog
}
private def testStopReplicaWithExistingPartition(leaderEpoch: Int,
deletePartition: Boolean,
throwIOException: Boolean,

14
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -78,6 +78,8 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException @@ -78,6 +78,8 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException
import org.apache.zookeeper.ZooDefs._
import org.apache.zookeeper.data.ACL
import org.junit.jupiter.api.Assertions._
import org.mockito.ArgumentMatchers.{any, anyBoolean}
import org.mockito.Mockito
import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@ -1408,8 +1410,9 @@ object TestUtils extends Logging { @@ -1408,8 +1410,9 @@ object TestUtils extends Logging {
time: MockTime = new MockTime(),
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
recoveryThreadsPerDataDir: Int = 4,
transactionVerificationEnabled: Boolean = false): LogManager = {
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
transactionVerificationEnabled: Boolean = false,
log: Option[UnifiedLog] = None): LogManager = {
val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
initialOfflineDirs = Array.empty[File],
configRepository = configRepository,
initialDefaultConfig = defaultConfig,
@ -1429,6 +1432,13 @@ object TestUtils extends Logging { @@ -1429,6 +1432,13 @@ object TestUtils extends Logging {
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
remoteStorageSystemEnable = false)
if (log.isDefined) {
val spyLogManager = Mockito.spy(logManager)
Mockito.doReturn(log.get).when(spyLogManager).getOrCreateLog(any(), anyBoolean(), anyBoolean(), any)
spyLogManager
} else
logManager
}
class MockAlterPartitionManager extends AlterPartitionManager {

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java

@ -108,7 +108,7 @@ public class CheckpointBench { @@ -108,7 +108,7 @@ public class CheckpointBench {
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latest(), 4, false);
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latest(), 4, false, Option.empty());
scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
final MetadataCache metadataCache =

Loading…
Cancel
Save