@ -19,11 +19,12 @@ package kafka.coordinator.group
@@ -19,11 +19,12 @@ package kafka.coordinator.group
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Gauge
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Collections
import java.util.Optional
import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName
import kafka.api._
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
@@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics. { JmxReporter , Metrics => kMetrics }
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
@ -56,6 +58,7 @@ class GroupMetadataManagerTest {
@@ -56,6 +58,7 @@ class GroupMetadataManagerTest {
var zkClient : KafkaZkClient = null
var partition : Partition = null
var defaultOffsetRetentionMs = Long . MaxValue
var metrics : kMetrics = null
val groupId = "foo"
val groupInstanceId = Some ( "bar" )
@ -87,9 +90,10 @@ class GroupMetadataManagerTest {
@@ -87,9 +90,10 @@ class GroupMetadataManagerTest {
EasyMock . expect ( zkClient . getTopicPartitionCount ( Topic . GROUP_METADATA_TOPIC_NAME ) ) . andReturn ( Some ( 2 ) )
EasyMock . replay ( zkClient )
metrics = new kMetrics ( )
time = new MockTime
replicaManager = EasyMock . createNiceMock ( classOf [ ReplicaManager ] )
groupMetadataManager = new GroupMetadataManager ( 0 , ApiVersion . latestVersion , offsetConfig , replicaManager , zkClient , time )
groupMetadataManager = new GroupMetadataManager ( 0 , ApiVersion . latestVersion , offsetConfig , replicaManager , zkClient , time , metrics )
partition = EasyMock . niceMock ( classOf [ Partition ] )
}
@ -2051,4 +2055,43 @@ class GroupMetadataManagerTest {
@@ -2051,4 +2055,43 @@ class GroupMetadataManagerTest {
group . transitionTo ( CompletingRebalance )
expectMetrics ( groupMetadataManager , 1 , 0 , 1 )
}
@Test
def testPartitionLoadMetric ( ) : Unit = {
val server = ManagementFactory . getPlatformMBeanServer
val mBeanName = "kafka.server:type=group-coordinator-metrics"
val reporter = new JmxReporter ( "kafka.server" )
metrics . addReporter ( reporter )
def partitionLoadTime ( attribute : String ) : Double = {
server . getAttribute ( new ObjectName ( mBeanName ) , attribute ) . asInstanceOf [ Double ]
}
assertTrue ( server . isRegistered ( new ObjectName ( mBeanName ) ) )
assertEquals ( Double . NaN , partitionLoadTime ( "partition-load-time-max" ) , 0 )
assertEquals ( Double . NaN , partitionLoadTime ( "partition-load-time-avg" ) , 0 )
assertTrue ( reporter . containsMbean ( mBeanName ) )
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L
val memberId = "98098230493"
val committedOffsets = Map (
new TopicPartition ( "foo" , 0 ) -> 23L ,
new TopicPartition ( "foo" , 1 ) -> 455L ,
new TopicPartition ( "bar" , 0 ) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords ( committedOffsets )
val groupMetadataRecord = buildStableGroupRecordWithMember ( generation = 15 ,
protocolType = "consumer" , protocol = "range" , memberId )
val records = MemoryRecords . withRecords ( startOffset , CompressionType . NONE ,
( offsetCommitRecords ++ Seq ( groupMetadataRecord ) ) . toArray : _ * )
expectGroupMetadataLoad ( groupMetadataTopicPartition , startOffset , records )
EasyMock . replay ( replicaManager )
groupMetadataManager . loadGroupsAndOffsets ( groupMetadataTopicPartition , _ => ( ) )
assertTrue ( partitionLoadTime ( "partition-load-time-max" ) >= 0.0 )
assertTrue ( partitionLoadTime ( "partition-load-time-avg" ) >= 0.0 )
}
}