@ -13,20 +13,28 @@
@@ -13,20 +13,28 @@
package kafka.api
import java.time
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util. { Collection , Collections , Properties }
import util.control.Breaks._
import kafka.server. { BaseRequestTest , KafkaConfig }
import kafka.utils. { CoreUtils , Logging , ShutdownableThread , TestUtils }
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer. { KafkaProducer , ProducerRecord }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.GroupMaxSizeReachedException
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests. { FindCoordinatorRequest , FindCoordinatorResponse }
import org.junit.Assert._
import org.junit. { After , Before , Ignore , Test }
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration
import scala.concurrent. { Await , ExecutionContext , ExecutionContextExecutor , Future => SFuture }
/* *
* Integration tests for the consumer that cover basic usage as well as server failures
@ -35,17 +43,23 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
@@ -35,17 +43,23 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
val topic = "topic"
val part = 0
val tp = new TopicPartition ( topic , part )
val maxGroupSize = 5
// Time to process commit and leave group requests in tests when brokers are available
val gracefulCloseTimeMs = 1000
val executor = Executors . newScheduledThreadPool ( 2 )
override def generateConfigs = {
generateKafkaConfigs ( )
private def generateKafkaConfigs ( maxGroupSize : String = maxGroupSize . toString ) : Seq [ KafkaConfig ] = {
val properties = new Properties
properties . put ( KafkaConfig . OffsetsTopicReplicationFactorProp , "3" ) // don 't want to lose offset
properties . put ( KafkaConfig . OffsetsTopicPartitionsProp , "1" )
properties . put ( KafkaConfig . GroupMinSessionTimeoutMsProp , "10" ) // set small enough session timeout
properties . put ( KafkaConfig . GroupInitialRebalanceDelayMsProp , "0" )
properties . put ( KafkaConfig . GroupMaxSizeProp , maxGroupSize )
properties . put ( KafkaConfig . UncleanLeaderElectionEnableProp , "true" )
properties . put ( KafkaConfig . AutoCreateTopicsEnableProp , "false" )
@ -188,14 +202,14 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
@@ -188,14 +202,14 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
sendRecords ( numRecords , newtopic )
receiveRecords ( consumer , numRecords , newtopic , 10000 )
receiveRecords ( consumer , numRecords , 10000 )
servers . foreach ( server => killBroker ( server . config . brokerId ) )
Thread . sleep ( 500 )
restartDeadBrokers ( )
val future = executor . submit ( new Runnable {
def run ( ) = receiveRecords ( consumer , numRecords , newtopic , 10000 )
def run ( ) = receiveRecords ( consumer , numRecords , 10000 )
} )
sendRecords ( numRecords , newtopic )
future . get
@ -276,6 +290,166 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
@@ -276,6 +290,166 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
future2 . get
/* *
* If we have a running consumer group of size N , configure consumer . group . max . size = N - 1 and restart all brokers ,
* the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config .
* Then , 1 consumer should be left out of the group .
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup ( ) : Unit = {
val topic = "group-max-size-test"
val maxGroupSize = 2
val consumerCount = maxGroupSize + 1
var recordsProduced = maxGroupSize * 100
val partitionCount = consumerCount * 2
if ( recordsProduced % partitionCount != 0 ) {
// ensure even record distribution per partition
recordsProduced += partitionCount - recordsProduced % partitionCount
val executor = Executors . newScheduledThreadPool ( consumerCount * 2 )
this . consumerConfig . setProperty ( ConsumerConfig . MAX_POLL_INTERVAL_MS_CONFIG , "60000" )
this . consumerConfig . setProperty ( ConsumerConfig . HEARTBEAT_INTERVAL_MS_CONFIG , "1000" )
this . consumerConfig . setProperty ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , "false" )
val producer = createProducer ( )
createTopic ( topic , numPartitions = partitionCount , replicationFactor = numBrokers )
val stableConsumers = createConsumersWithGroupId ( "group2" , consumerCount , executor , topic = topic )
// assert group is stable and working
sendRecords ( producer , recordsProduced , topic , numPartitions = Some ( partitionCount ) )
stableConsumers . foreach { cons => {
receiveAndCommit ( cons , recordsProduced / consumerCount , 10000 )
} }
// roll all brokers with a lesser max group size to make sure coordinator has the new config
val newConfigs = generateKafkaConfigs ( maxGroupSize . toString )
val kickedConsumerOut = new AtomicBoolean ( false )
var kickedOutConsumerIdx : Option [ Int ] = None
val lock = new ReentrantLock
// restart brokers until the group moves to a Coordinator with the new config
breakable { for ( broker <- servers . indices ) {
killBroker ( broker )
sendRecords ( producer , recordsProduced , topic , numPartitions = Some ( partitionCount ) )
var successfulConsumes = 0
// compute consumptions in a non - blocking way in order to account for the rebalance once the group . size takes effect
val consumeFutures = new ArrayBuffer [ SFuture [ Any ] ]
implicit val executorContext : ExecutionContextExecutor = ExecutionContext . fromExecutor ( executor )
stableConsumers . indices . foreach ( idx => {
val currentConsumer = stableConsumers ( idx )
val consumeFuture = SFuture {
try {
receiveAndCommit ( currentConsumer , recordsProduced / consumerCount , 10000 )
CoreUtils . inLock ( lock ) { successfulConsumes += 1 }
} catch {
case e : Throwable =>
if ( ! e . isInstanceOf [ GroupMaxSizeReachedException ] ) {
throw e
if ( ! kickedConsumerOut . compareAndSet ( false , true ) ) {
fail ( s" Received more than one ${ classOf [ GroupMaxSizeReachedException ] } " )
kickedOutConsumerIdx = Some ( idx )
consumeFutures += consumeFuture
} )
Await . result ( SFuture . sequence ( consumeFutures ) , Duration ( "12sec" ) )
if ( kickedConsumerOut . get ( ) ) {
// validate the rest N - 1 consumers consumed successfully
assertEquals ( maxGroupSize , successfulConsumes )
val config = newConfigs ( broker )
servers ( broker ) = TestUtils . createServer ( config , time = brokerTime ( config . brokerId ) )
restartDeadBrokers ( )
} }
if ( ! kickedConsumerOut . get ( ) )
fail ( s" Should have received an ${ classOf [ GroupMaxSizeReachedException ] } during the cluster roll " )
// assert that the group has gone through a rebalance and shed off one consumer
stableConsumers . remove ( kickedOutConsumerIdx . get )
sendRecords ( producer , recordsProduced , topic , numPartitions = Some ( partitionCount ) )
// should be only maxGroupSize consumers left in the group
stableConsumers . foreach { cons => {
receiveAndCommit ( cons , recordsProduced / maxGroupSize , 10000 )
} }
/* *
* When we have the consumer group max size configured to X , the X + 1 th consumer trying to join should receive a fatal exception
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize ( ) : Unit = {
val topic = "group-max-size-test"
val groupId = "group1"
val executor = Executors . newScheduledThreadPool ( maxGroupSize * 2 )
createTopic ( topic , maxGroupSize , numBrokers )
this . consumerConfig . setProperty ( ConsumerConfig . MAX_POLL_INTERVAL_MS_CONFIG , "60000" )
this . consumerConfig . setProperty ( ConsumerConfig . HEARTBEAT_INTERVAL_MS_CONFIG , "1000" )
this . consumerConfig . setProperty ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , "false" )
// Create N + 1 consumers in the same consumer group and assert that the N + 1 th consumer receives a fatal error when it tries to join the group
val stableConsumers = createConsumersWithGroupId ( groupId , maxGroupSize , executor , topic )
val newConsumer = createConsumerWithGroupId ( groupId )
var failedRebalance = false
var exception : Exception = null
waitForRebalance ( 5000 , subscribeAndPoll ( newConsumer , executor = executor , onException = e => { failedRebalance = true ; exception = e } ) ,
executor = executor , stableConsumers : _ * )
assertTrue ( "Rebalance did not fail as expected" , failedRebalance )
assertTrue ( exception . isInstanceOf [ GroupMaxSizeReachedException ] )
// assert group continues to live
val producer = createProducer ( )
sendRecords ( producer , maxGroupSize * 100 , topic , numPartitions = Some ( maxGroupSize ) )
stableConsumers . foreach { cons => {
receiveExactRecords ( cons , 100 , 10000 )
} }
/* *
* Creates N consumers with the same group ID and ensures the group rebalances properly at each step
private def createConsumersWithGroupId ( groupId : String , consumerCount : Int , executor : ExecutorService , topic : String = topic ) : ArrayBuffer [ KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] ] = {
val stableConsumers = ArrayBuffer [ KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] ] ( )
for ( _ <- 1. to ( consumerCount ) ) {
val newConsumer = createConsumerWithGroupId ( groupId )
waitForRebalance ( 5000 , subscribeAndPoll ( newConsumer , executor = executor , topic = topic ) ,
executor = executor , stableConsumers : _ * )
stableConsumers += newConsumer
def subscribeAndPoll ( consumer : KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] , executor : ExecutorService , revokeSemaphore : Option [ Semaphore ] = None ,
onException : Exception => Unit = e => { throw e } , topic : String = topic , pollTimeout : Int = 1000 ) : Future [ Any ] = {
executor . submit ( CoreUtils . runnable {
try {
consumer . subscribe ( Collections . singletonList ( topic ) )
consumer . poll ( java . time . Duration . ofMillis ( pollTimeout ) )
} catch {
case e : Exception => onException . apply ( e )
} , 0 )
def waitForRebalance ( timeoutMs : Long , future : Future [ Any ] , executor : ExecutorService , otherConsumers : KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] * ) {
val startMs = System . currentTimeMillis
implicit val executorContext : ExecutionContextExecutor = ExecutionContext . fromExecutor ( executor )
while ( System . currentTimeMillis < startMs + timeoutMs && ! future . isDone ) {
val consumeFutures = otherConsumers . map ( consumer => SFuture {
consumer . poll ( time . Duration . ofMillis ( 1000 ) )
} )
Await . result ( SFuture . sequence ( consumeFutures ) , Duration ( "1500ms" ) )
assertTrue ( "Rebalance did not complete in time" , future . isDone )
/* *
* Consumer is closed during rebalance . Close should leave group and close
* immediately if rebalance is in progress . If brokers are not available ,
@ -323,7 +497,6 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
@@ -323,7 +497,6 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
assertFalse ( "Rebalance completed too early" , future . isDone )
val consumer1 = createConsumerWithGroupId ( groupId )
waitForRebalance ( 2000 , subscribeAndPoll ( consumer1 ) )
val consumer2 = createConsumerWithGroupId ( groupId )
@ -360,18 +533,31 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
@@ -360,18 +533,31 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
consumer . assign ( Collections . singleton ( tp ) )
consumer . subscribe ( Collections . singleton ( topic ) )
receiveRecords ( consumer , numRecords )
receiveExact Records ( consumer , numRecords )
private def receiveRecords ( consumer : KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] , numRecords : Int , topic : String = this . topic , t imeoutMs : Long = 60000 ) {
private def receiveRecords ( consumer : KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] , numRecords : Int , timeoutMs : Long = 60000 ) : Long = {
var received = 0L
val endTimeMs = System . currentTimeMillis + timeoutMs
while ( received < numRecords && System . currentTimeMillis < endTimeMs )
received += consumer . poll ( 1000 ) . count ( )
received += consumer . poll ( time . Duration . ofMillis ( 100 ) ) . count ( )
private def receiveExactRecords ( consumer : KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] , numRecords : Int , timeoutMs : Long = 60000 ) : Unit = {
val received = receiveRecords ( consumer , numRecords , timeoutMs )
assertEquals ( numRecords , received )
@throws ( classOf [ CommitFailedException ] )
private def receiveAndCommit ( consumer : KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] , numRecords : Int , timeoutMs : Long ) : Unit = {
val received = receiveRecords ( consumer , numRecords , timeoutMs )
assertTrue ( s" Received $received , expected at least $numRecords " , numRecords <= received )
consumer . commitSync ( )
private def submitCloseAndValidate ( consumer : KafkaConsumer [ Array [ Byte ] , Array [ Byte ] ] ,
closeTimeoutMs : Long , minCloseTimeMs : Option [ Long ] , maxCloseTimeMs : Option [ Long ] ) : Future [ Any ] = {
executor . submit ( CoreUtils . runnable {
@ -427,9 +613,21 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
@@ -427,9 +613,21 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
private def sendRecords ( producer : KafkaProducer [ Array [ Byte ] , Array [ Byte ] ] ,
numRecords : Int ,
topic : String = this . topic ) {
topic : String = this . topic ,
numPartitions : Option [ Int ] = None ) {
var partitionIndex = 0
def getPartition : Int = {
numPartitions match {
case Some ( partitions ) =>
val nextPart = partitionIndex % partitions
partitionIndex += 1
case None => part
val futures = ( 0 until numRecords ) . map { i =>
producer . send ( new ProducerRecord ( topic , part , i . toString . getBytes , i . toString . getBytes ) )
producer . send ( new ProducerRecord ( topic , getPartition , i . toString . getBytes , i . toString . getBytes ) )
futures . map ( _ . get )