@ -112,6 +112,8 @@ class StreamsBrokerBounceTest(Test):
@@ -112,6 +112,8 @@ class StreamsBrokerBounceTest(Test):
' wcnt ' : { ' partitions ' : self . partitions , ' replication-factor ' : self . replication ,
' configs ' : { " min.insync.replicas " : 2 } } ,
' tagg ' : { ' partitions ' : self . partitions , ' replication-factor ' : self . replication ,
' configs ' : { " min.insync.replicas " : 2 } } ,
' __consumer_offsets ' : { ' partitions ' : 50 , ' replication-factor ' : self . replication ,
' configs ' : { " min.insync.replicas " : 2 } }
}
@ -132,21 +134,21 @@ class StreamsBrokerBounceTest(Test):
@@ -132,21 +134,21 @@ class StreamsBrokerBounceTest(Test):
signal_node ( self , self . kafka . nodes [ num ] , sig )
def setup_system ( self ) :
# Setup phase
def setup_system ( self , start_processor = True ) :
# Setup phase
self . zk = ZookeeperService ( self . test_context , num_nodes = 1 )
self . zk . start ( )
self . kafka = KafkaService ( self . test_context , num_nodes = self . replication ,
zk = self . zk , topics = self . topics )
self . kafka = KafkaService ( self . test_context , num_nodes = self . replication , zk = self . zk , topics = self . topics )
self . kafka . start ( )
# Start test harness
self . driver = StreamsSmokeTestDriverService ( self . test_context , self . kafka )
self . processor1 = StreamsSmokeTestJobRunnerService ( self . test_context , self . kafka )
self . driver . start ( )
self . processor1 . start ( )
if ( start_processor ) :
self . processor1 . start ( )
def collect_results ( self , sleep_time_secs ) :
data = { }
@ -210,7 +212,7 @@ class StreamsBrokerBounceTest(Test):
@@ -210,7 +212,7 @@ class StreamsBrokerBounceTest(Test):
Streams should throw an exception since it cannot create topics with the desired
replication factor of 3
"""
self . setup_system ( )
self . setup_system ( start_processor = False )
# Sleep to allow test to run for a bit
time . sleep ( sleep_time_secs )
@ -218,6 +220,8 @@ class StreamsBrokerBounceTest(Test):
@@ -218,6 +220,8 @@ class StreamsBrokerBounceTest(Test):
# Fail brokers
self . fail_broker_type ( failure_mode , broker_type )
self . processor1 . start ( )
return self . collect_results ( sleep_time_secs )
@cluster ( num_nodes = 7 )