@ -20,6 +20,7 @@ import kafka.cluster.EndPoint;
@@ -20,6 +20,7 @@ import kafka.cluster.EndPoint;
import kafka.cluster.Partition ;
import kafka.log.LogSegment ;
import kafka.log.UnifiedLog ;
import kafka.server.BrokerTopicStats ;
import kafka.server.KafkaConfig ;
import org.apache.kafka.common.KafkaException ;
import org.apache.kafka.common.TopicIdPartition ;
@ -47,6 +48,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
@@ -47,6 +48,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException ;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager ;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType ;
import org.apache.kafka.server.metrics.KafkaMetricsGroup ;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint ;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint ;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache ;
@ -54,6 +56,7 @@ import org.apache.kafka.storage.internals.log.EpochEntry;
@@ -54,6 +56,7 @@ import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LazyIndex ;
import org.apache.kafka.storage.internals.log.OffsetIndex ;
import org.apache.kafka.storage.internals.log.ProducerStateManager ;
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool ;
import org.apache.kafka.storage.internals.log.TimeIndex ;
import org.apache.kafka.storage.internals.log.TransactionIndex ;
import org.apache.kafka.test.TestUtils ;
@ -61,6 +64,7 @@ import org.junit.jupiter.api.BeforeEach;
@@ -61,6 +64,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test ;
import org.mockito.ArgumentCaptor ;
import org.mockito.InOrder ;
import org.mockito.MockedConstruction ;
import org.mockito.Mockito ;
import scala.Option ;
import scala.collection.JavaConverters ;
@ -83,7 +87,10 @@ import java.util.Optional;
@@ -83,7 +87,10 @@ import java.util.Optional;
import java.util.Properties ;
import java.util.TreeMap ;
import java.util.concurrent.CompletableFuture ;
import java.util.stream.Collectors ;
import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT ;
import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_READER_METRICS_NAME_PREFIX ;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX ;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow ;
import static org.junit.jupiter.api.Assertions.assertEquals ;
@ -93,14 +100,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -93,14 +100,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any ;
import static org.mockito.ArgumentMatchers.anyInt ;
import static org.mockito.ArgumentMatchers.anyLong ;
import static org.mockito.ArgumentMatchers.anyString ;
import static org.mockito.ArgumentMatchers.eq ;
import static org.mockito.Mockito.doNothing ;
import static org.mockito.Mockito.doThrow ;
import static org.mockito.Mockito.inOrder ;
import static org.mockito.Mockito.mock ;
import static org.mockito.Mockito.mockConstruction ;
import static org.mockito.Mockito.never ;
import static org.mockito.Mockito.spy ;
import static org.mockito.Mockito.times ;
import static org.mockito.Mockito.verify ;
import static org.mockito.Mockito.verifyNoMoreInteractions ;
import static org.mockito.Mockito.when ;
public class RemoteLogManagerTest {
@ -112,6 +123,7 @@ public class RemoteLogManagerTest {
@@ -112,6 +123,7 @@ public class RemoteLogManagerTest {
RemoteStorageManager remoteStorageManager = mock ( RemoteStorageManager . class ) ;
RemoteLogMetadataManager remoteLogMetadataManager = mock ( RemoteLogMetadataManager . class ) ;
RemoteLogManagerConfig remoteLogManagerConfig = null ;
BrokerTopicStats brokerTopicStats = new BrokerTopicStats ( ) ;
RemoteLogManager remoteLogManager = null ;
TopicIdPartition leaderTopicIdPartition = new TopicIdPartition ( Uuid . randomUuid ( ) , new TopicPartition ( "Leader" , 0 ) ) ;
@ -143,7 +155,10 @@ public class RemoteLogManagerTest {
@@ -143,7 +155,10 @@ public class RemoteLogManagerTest {
topicIds . put ( followerTopicIdPartition . topicPartition ( ) . topic ( ) , followerTopicIdPartition . topicId ( ) ) ;
Properties props = new Properties ( ) ;
remoteLogManagerConfig = createRLMConfig ( props ) ;
remoteLogManager = new RemoteLogManager ( remoteLogManagerConfig , brokerId , logDir , clusterId , time , tp - > Optional . of ( mockLog ) ) {
kafka . utils . TestUtils . clearYammerMetrics ( ) ;
remoteLogManager = new RemoteLogManager ( remoteLogManagerConfig , brokerId , logDir , clusterId , time , tp - > Optional . of ( mockLog ) , brokerTopicStats ) {
public RemoteStorageManager createRemoteStorageManager ( ) {
return remoteStorageManager ;
}
@ -252,6 +267,8 @@ public class RemoteLogManagerTest {
@@ -252,6 +267,8 @@ public class RemoteLogManagerTest {
long nextSegmentStartOffset = 150L ;
long oldSegmentEndOffset = nextSegmentStartOffset - 1 ;
when ( mockLog . topicPartition ( ) ) . thenReturn ( leaderTopicIdPartition . topicPartition ( ) ) ;
// leader epoch preparation
checkpoint . write ( totalEpochEntries ) ;
LeaderEpochFileCache cache = new LeaderEpochFileCache ( leaderTopicIdPartition . topicPartition ( ) , checkpoint ) ;
@ -271,6 +288,7 @@ public class RemoteLogManagerTest {
@@ -271,6 +288,7 @@ public class RemoteLogManagerTest {
FileRecords fileRecords = mock ( FileRecords . class ) ;
when ( oldSegment . log ( ) ) . thenReturn ( fileRecords ) ;
when ( fileRecords . file ( ) ) . thenReturn ( tempFile ) ;
when ( fileRecords . sizeInBytes ( ) ) . thenReturn ( 10 ) ;
when ( oldSegment . readNextOffset ( ) ) . thenReturn ( nextSegmentStartOffset ) ;
when ( mockLog . activeSegment ( ) ) . thenReturn ( activeSegment ) ;
@ -297,6 +315,15 @@ public class RemoteLogManagerTest {
@@ -297,6 +315,15 @@ public class RemoteLogManagerTest {
when ( remoteLogMetadataManager . updateRemoteLogSegmentMetadata ( any ( RemoteLogSegmentMetadataUpdate . class ) ) ) . thenReturn ( dummyFuture ) ;
doNothing ( ) . when ( remoteStorageManager ) . copyLogSegmentData ( any ( RemoteLogSegmentMetadata . class ) , any ( LogSegmentData . class ) ) ;
// Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
assertEquals ( 0 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . remoteWriteRequestRate ( ) . count ( ) ) ;
assertEquals ( 0 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . remoteBytesOutRate ( ) . count ( ) ) ;
assertEquals ( 0 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . failedRemoteWriteRequestRate ( ) . count ( ) ) ;
// Verify aggregate metrics
assertEquals ( 0 , brokerTopicStats . allTopicsStats ( ) . remoteWriteRequestRate ( ) . count ( ) ) ;
assertEquals ( 0 , brokerTopicStats . allTopicsStats ( ) . remoteBytesOutRate ( ) . count ( ) ) ;
assertEquals ( 0 , brokerTopicStats . allTopicsStats ( ) . failedRemoteWriteRequestRate ( ) . count ( ) ) ;
RemoteLogManager . RLMTask task = remoteLogManager . new RLMTask ( leaderTopicIdPartition ) ;
task . convertToLeader ( 2 ) ;
task . copyLogSegmentsToRemote ( mockLog ) ;
@ -329,6 +356,92 @@ public class RemoteLogManagerTest {
@@ -329,6 +356,92 @@ public class RemoteLogManagerTest {
ArgumentCaptor < Long > argument = ArgumentCaptor . forClass ( Long . class ) ;
verify ( mockLog , times ( 1 ) ) . updateHighestOffsetInRemoteStorage ( argument . capture ( ) ) ;
assertEquals ( oldSegmentEndOffset , argument . getValue ( ) ) ;
// Verify the metric for remote writes is updated correctly
assertEquals ( 1 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . remoteWriteRequestRate ( ) . count ( ) ) ;
assertEquals ( 10 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . remoteBytesOutRate ( ) . count ( ) ) ;
// Verify we did not report any failure for remote writes
assertEquals ( 0 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . failedRemoteWriteRequestRate ( ) . count ( ) ) ;
// Verify aggregate metrics
assertEquals ( 1 , brokerTopicStats . allTopicsStats ( ) . remoteWriteRequestRate ( ) . count ( ) ) ;
assertEquals ( 10 , brokerTopicStats . allTopicsStats ( ) . remoteBytesOutRate ( ) . count ( ) ) ;
assertEquals ( 0 , brokerTopicStats . allTopicsStats ( ) . failedRemoteWriteRequestRate ( ) . count ( ) ) ;
}
@Test
void testMetricsUpdateOnCopyLogSegmentsFailure ( ) throws Exception {
long oldSegmentStartOffset = 0L ;
long nextSegmentStartOffset = 150L ;
when ( mockLog . topicPartition ( ) ) . thenReturn ( leaderTopicIdPartition . topicPartition ( ) ) ;
// leader epoch preparation
checkpoint . write ( totalEpochEntries ) ;
LeaderEpochFileCache cache = new LeaderEpochFileCache ( leaderTopicIdPartition . topicPartition ( ) , checkpoint ) ;
when ( mockLog . leaderEpochCache ( ) ) . thenReturn ( Option . apply ( cache ) ) ;
when ( remoteLogMetadataManager . highestOffsetForEpoch ( any ( TopicIdPartition . class ) , anyInt ( ) ) ) . thenReturn ( Optional . of ( 0L ) ) ;
File tempFile = TestUtils . tempFile ( ) ;
File mockProducerSnapshotIndex = TestUtils . tempFile ( ) ;
File tempDir = TestUtils . tempDirectory ( ) ;
// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock ( LogSegment . class ) ;
LogSegment activeSegment = mock ( LogSegment . class ) ;
when ( oldSegment . baseOffset ( ) ) . thenReturn ( oldSegmentStartOffset ) ;
when ( activeSegment . baseOffset ( ) ) . thenReturn ( nextSegmentStartOffset ) ;
FileRecords fileRecords = mock ( FileRecords . class ) ;
when ( oldSegment . log ( ) ) . thenReturn ( fileRecords ) ;
when ( fileRecords . file ( ) ) . thenReturn ( tempFile ) ;
when ( fileRecords . sizeInBytes ( ) ) . thenReturn ( 10 ) ;
when ( oldSegment . readNextOffset ( ) ) . thenReturn ( nextSegmentStartOffset ) ;
when ( mockLog . activeSegment ( ) ) . thenReturn ( activeSegment ) ;
when ( mockLog . logStartOffset ( ) ) . thenReturn ( oldSegmentStartOffset ) ;
when ( mockLog . logSegments ( anyLong ( ) , anyLong ( ) ) ) . thenReturn ( JavaConverters . collectionAsScalaIterable ( Arrays . asList ( oldSegment , activeSegment ) ) ) ;
ProducerStateManager mockStateManager = mock ( ProducerStateManager . class ) ;
when ( mockLog . producerStateManager ( ) ) . thenReturn ( mockStateManager ) ;
when ( mockStateManager . fetchSnapshot ( anyLong ( ) ) ) . thenReturn ( Optional . of ( mockProducerSnapshotIndex ) ) ;
when ( mockLog . lastStableOffset ( ) ) . thenReturn ( 250L ) ;
LazyIndex idx = LazyIndex . forOffset ( UnifiedLog . offsetIndexFile ( tempDir , oldSegmentStartOffset , "" ) , oldSegmentStartOffset , 1000 ) ;
LazyIndex timeIdx = LazyIndex . forTime ( UnifiedLog . timeIndexFile ( tempDir , oldSegmentStartOffset , "" ) , oldSegmentStartOffset , 1500 ) ;
File txnFile = UnifiedLog . transactionIndexFile ( tempDir , oldSegmentStartOffset , "" ) ;
txnFile . createNewFile ( ) ;
TransactionIndex txnIndex = new TransactionIndex ( oldSegmentStartOffset , txnFile ) ;
when ( oldSegment . lazyTimeIndex ( ) ) . thenReturn ( timeIdx ) ;
when ( oldSegment . lazyOffsetIndex ( ) ) . thenReturn ( idx ) ;
when ( oldSegment . txnIndex ( ) ) . thenReturn ( txnIndex ) ;
CompletableFuture < Void > dummyFuture = new CompletableFuture < > ( ) ;
dummyFuture . complete ( null ) ;
when ( remoteLogMetadataManager . addRemoteLogSegmentMetadata ( any ( RemoteLogSegmentMetadata . class ) ) ) . thenReturn ( dummyFuture ) ;
doThrow ( new RuntimeException ( ) ) . when ( remoteStorageManager ) . copyLogSegmentData ( any ( RemoteLogSegmentMetadata . class ) , any ( LogSegmentData . class ) ) ;
// Verify the metrics for remote write requests/failures is zero before attempt to copy log segment
assertEquals ( 0 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . remoteWriteRequestRate ( ) . count ( ) ) ;
assertEquals ( 0 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . failedRemoteWriteRequestRate ( ) . count ( ) ) ;
// Verify aggregate metrics
assertEquals ( 0 , brokerTopicStats . allTopicsStats ( ) . remoteWriteRequestRate ( ) . count ( ) ) ;
assertEquals ( 0 , brokerTopicStats . allTopicsStats ( ) . failedRemoteWriteRequestRate ( ) . count ( ) ) ;
RemoteLogManager . RLMTask task = remoteLogManager . new RLMTask ( leaderTopicIdPartition ) ;
task . convertToLeader ( 2 ) ;
task . copyLogSegmentsToRemote ( mockLog ) ;
// Verify we attempted to copy log segment metadata to remote storage
verify ( remoteStorageManager , times ( 1 ) ) . copyLogSegmentData ( any ( RemoteLogSegmentMetadata . class ) , any ( LogSegmentData . class ) ) ;
// Verify we should not have updated the highest offset because of write failure
verify ( mockLog , times ( 0 ) ) . updateHighestOffsetInRemoteStorage ( anyLong ( ) ) ;
// Verify the metric for remote write requests/failures was updated.
assertEquals ( 1 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . remoteWriteRequestRate ( ) . count ( ) ) ;
assertEquals ( 1 , brokerTopicStats . topicStats ( leaderTopicIdPartition . topic ( ) ) . failedRemoteWriteRequestRate ( ) . count ( ) ) ;
// Verify aggregate metrics
assertEquals ( 1 , brokerTopicStats . allTopicsStats ( ) . remoteWriteRequestRate ( ) . count ( ) ) ;
assertEquals ( 1 , brokerTopicStats . allTopicsStats ( ) . failedRemoteWriteRequestRate ( ) . count ( ) ) ;
}
@Test
@ -412,7 +525,7 @@ public class RemoteLogManagerTest {
@@ -412,7 +525,7 @@ public class RemoteLogManagerTest {
void testGetClassLoaderAwareRemoteStorageManager ( ) throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock ( ClassLoaderAwareRemoteStorageManager . class ) ;
RemoteLogManager remoteLogManager =
new RemoteLogManager ( remoteLogManagerConfig , brokerId , logDir , clusterId , time , t - > Optional . empty ( ) ) {
new RemoteLogManager ( remoteLogManagerConfig , brokerId , logDir , clusterId , time , t - > Optional . empty ( ) , brokerTopicStats ) {
public RemoteStorageManager createRemoteStorageManager ( ) {
return rsmManager ;
}
@ -589,6 +702,47 @@ public class RemoteLogManagerTest {
@@ -589,6 +702,47 @@ public class RemoteLogManagerTest {
inorder . verify ( remoteLogMetadataManager , times ( 1 ) ) . close ( ) ;
}
@Test
public void testRemoveMetricsOnClose ( ) throws IOException {
MockedConstruction < KafkaMetricsGroup > mockMetricsGroupCtor = mockConstruction ( KafkaMetricsGroup . class ) ;
try {
RemoteLogManager remoteLogManager = new RemoteLogManager ( remoteLogManagerConfig , brokerId , logDir , clusterId ,
time , tp - > Optional . of ( mockLog ) , brokerTopicStats ) {
public RemoteStorageManager createRemoteStorageManager ( ) {
return remoteStorageManager ;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager ( ) {
return remoteLogMetadataManager ;
}
} ;
// Close RemoteLogManager so that metrics are removed
remoteLogManager . close ( ) ;
KafkaMetricsGroup mockRlmMetricsGroup = mockMetricsGroupCtor . constructed ( ) . get ( 0 ) ;
KafkaMetricsGroup mockThreadPoolMetricsGroup = mockMetricsGroupCtor . constructed ( ) . get ( 1 ) ;
List < String > remoteLogManagerMetricNames = Collections . singletonList ( REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT ) ;
List < String > remoteStorageThreadPoolMetricNames = RemoteStorageThreadPool . METRIC_SUFFIXES
. stream ( )
. map ( suffix - > REMOTE_LOG_READER_METRICS_NAME_PREFIX + suffix )
. collect ( Collectors . toList ( ) ) ;
verify ( mockRlmMetricsGroup , times ( remoteLogManagerMetricNames . size ( ) ) ) . newGauge ( anyString ( ) , any ( ) ) ;
// Verify that the RemoteLogManager metrics are removed
remoteLogManagerMetricNames . forEach ( metricName - > verify ( mockRlmMetricsGroup ) . removeMetric ( metricName ) ) ;
verify ( mockThreadPoolMetricsGroup , times ( remoteStorageThreadPoolMetricNames . size ( ) ) ) . newGauge ( anyString ( ) , any ( ) ) ;
// Verify that the RemoteStorageThreadPool metrics are removed
remoteStorageThreadPoolMetricNames . forEach ( metricName - > verify ( mockThreadPoolMetricsGroup ) . removeMetric ( metricName ) ) ;
verifyNoMoreInteractions ( mockRlmMetricsGroup ) ;
verifyNoMoreInteractions ( mockThreadPoolMetricsGroup ) ;
} finally {
mockMetricsGroupCtor . close ( ) ;
}
}
private Partition mockPartition ( TopicIdPartition topicIdPartition ) {
TopicPartition tp = topicIdPartition . topicPartition ( ) ;
Partition partition = mock ( Partition . class ) ;