@ -258,7 +258,7 @@ object TestUtils extends Logging {
@@ -258,7 +258,7 @@ object TestUtils extends Logging {
numPartitions : Int = 1 ,
replicationFactor : Int = 1 ,
servers : Seq [ KafkaServer ] ,
topicConfig : Properties = new Properties ) : scala.collection.immutable.Map [ Int , Option [ Int ] ] = {
topicConfig : Properties = new Properties ) : scala.collection.immutable.Map [ Int , Int ] = {
// create topic
AdminUtils . createTopic ( zkUtils , topic , numPartitions , replicationFactor , topicConfig )
// wait until the update metadata request for new topic reaches all servers
@ -274,7 +274,7 @@ object TestUtils extends Logging {
@@ -274,7 +274,7 @@ object TestUtils extends Logging {
* Return the leader for each partition .
*/
def createTopic ( zkUtils : ZkUtils , topic : String , partitionReplicaAssignment : collection.Map [ Int , Seq [ Int ] ] ,
servers : Seq [ KafkaServer ] ) : scala.collection.immutable.Map [ Int , Option [ Int ] ] = {
servers : Seq [ KafkaServer ] ) : scala.collection.immutable.Map [ Int , Int ] = {
// create topic
AdminUtils . createOrUpdateTopicPartitionAssignmentPathInZK ( zkUtils , topic , partitionReplicaAssignment )
// wait until the update metadata request for new topic reaches all servers
@ -744,46 +744,55 @@ object TestUtils extends Logging {
@@ -744,46 +744,55 @@ object TestUtils extends Logging {
* If oldLeaderOpt is defined , it waits until the new leader is different from the old leader .
* If newLeaderOpt is defined , it waits until the new leader becomes the expected new leader .
*
* @return The new leader or assertion failure if timeout is reached .
* @return The new leader ( note that negative values are used to indicate conditions like NoLeader and
* LeaderDuringDelete ) .
* @throws AssertionError if the expected condition is not true within the timeout .
*/
def waitUntilLeaderIsElectedOrChanged ( zkUtils : ZkUtils , topic : String , partition : Int ,
timeoutMs : Long = 30000 ,
oldLeaderOpt : Option [ Int ] = None , newLeaderOpt : Option [ Int ] = None ) : Option [ Int ] = {
def waitUntilLeaderIsElectedOrChanged ( zkUtils : ZkUtils , topic : String , partition : Int , timeoutMs : Long = 30000L ,
oldLeaderOpt : Option [ Int ] = None , newLeaderOpt : Option [ Int ] = None ) : Int = {
require ( ! ( oldLeaderOpt . isDefined && newLeaderOpt . isDefined ) , "Can't define both the old and the new leader" )
val startTime = System . currentTimeMillis ( )
var isLeaderElectedOrChanged = false
val topicPartition = new TopicPartition ( topic , partition )
trace ( "Waiting for leader to be elected or changed for partition [%s,%d], older leader is %s, new leader is %s"
. format ( topic , partition , oldLeaderOpt , newLeaderOpt ) )
trace ( s "Waiting for leader to be elected or changed for partition $topicPartition , old leader is $oldLeaderOpt , " +
s" new leader is $newLeaderOpt " )
var leader : Option [ Int ] = None
while ( ! isLeaderElectedOrChanged && System . currentTimeMillis ( ) < startTime + timeoutMs ) {
var electedOrChangedLeader : Option [ Int ] = None
while ( electedOrChangedLeader . isEmpty && System . currentTimeMillis ( ) < startTime + timeoutMs ) {
// check if leader is elected
leader = zkUtils . getLeaderForPartition ( topic , partition )
leader match {
case Some ( l ) =>
if ( newLeaderOpt . isDefined && newLeaderOpt . get == l ) {
trace ( "Expected new leader %d is elected for partition [%s,%d]" . format ( l , topic , partition ) )
isLeaderElectedOrChanged = true
} else if ( oldLeaderOpt . isDefined && oldLeaderOpt . get != l ) {
trace ( "Leader for partition [%s,%d] is changed from %d to %d" . format ( topic , partition , oldLeaderOpt . get , l ) )
isLeaderElectedOrChanged = true
} else if ( oldLeaderOpt . isEmpty ) {
trace ( "Leader %d is elected for partition [%s,%d]" . format ( l , topic , partition ) )
isLeaderElectedOrChanged = true
} else {
trace ( "Current leader for partition [%s,%d] is %d" . format ( topic , partition , l ) )
}
case Some ( l ) => ( newLeaderOpt , oldLeaderOpt ) match {
case ( Some ( newLeader ) , _ ) if newLeader == l =>
trace ( s" Expected new leader $l is elected for partition $topicPartition " )
electedOrChangedLeader = leader
case ( _ , Some ( oldLeader ) ) if oldLeader != l =>
trace ( s" Leader for partition $topicPartition is changed from $oldLeader to $l " )
electedOrChangedLeader = leader
case ( None , None ) =>
trace ( s" Leader $l is elected for partition $topicPartition " )
electedOrChangedLeader = leader
case _ =>
trace ( s" Current leader for partition $topicPartition is $l " )
}
case None =>
trace ( "Leader for partition [%s,%d] is not elected yet" . format ( topic , partition ) )
trace ( s" Leader for partition $topicPartition is not elected yet " )
}
Thread . sleep ( timeoutMs . min ( 100L ) )
Thread . sleep ( math . min ( timeoutMs , 100L ) )
}
electedOrChangedLeader . getOrElse {
val errorMessage = ( newLeaderOpt , oldLeaderOpt ) match {
case ( Some ( newLeader ) , _ ) =>
s" Timing out after $timeoutMs ms since expected new leader $newLeader was not elected for partition $topicPartition , leader is $leader "
case ( _ , Some ( oldLeader ) ) =>
s" Timing out after $timeoutMs ms since a new leader that is different from $oldLeader was not elected for partition $topicPartition , " +
s" leader is $leader "
case _ =>
s" Timing out after $timeoutMs ms since a leader was not elected for partition $topicPartition "
}
fail ( errorMessage )
}
if ( ! isLeaderElectedOrChanged )
fail ( "Timing out after %d ms since leader is not elected or changed for partition [%s,%d]"
. format ( timeoutMs , topic , partition ) )
leader
}
/* *