@ -47,53 +47,35 @@ class TransactionsTest(Test):
@@ -47,53 +47,35 @@ class TransactionsTest(Test):
self . num_output_partitions = 3
self . num_seed_messages = 100000
self . transaction_size = 750
self . first_transactional_id = " my-first-transactional-id "
self . second_transactional_id = " my-second-transactional-id "
self . consumer_group = " transactions-test-consumer-group "
self . zk = ZookeeperService ( test_context , num_nodes = 1 )
self . kafka = KafkaService ( test_context ,
num_nodes = self . num_brokers ,
zk = self . zk ,
topics = {
self . input_topic : {
" partitions " : self . num_input_partitions ,
" replication-factor " : 3 ,
" configs " : {
" min.insync.replicas " : 2
}
} ,
self . output_topic : {
" partitions " : self . num_output_partitions ,
" replication-factor " : 3 ,
" configs " : {
" min.insync.replicas " : 2
}
}
} )
zk = self . zk )
def setUp ( self ) :
self . zk . start ( )
def seed_messages ( self ) :
def seed_messages ( self , topic , num_seed_messages ) :
seed_timeout_sec = 10000
seed_producer = VerifiableProducer ( context = self . test_context ,
num_nodes = 1 ,
kafka = self . kafka ,
topic = self . input_ topic,
topic = topic ,
message_validator = is_int ,
max_messages = self . num_seed_messages ,
max_messages = num_seed_messages ,
enable_idempotence = True )
seed_producer . start ( )
wait_until ( lambda : seed_producer . num_acked > = self . num_seed_messages ,
wait_until ( lambda : seed_producer . num_acked > = num_seed_messages ,
timeout_sec = seed_timeout_sec ,
err_msg = " Producer failed to produce messages %d in %d s. " % \
( self . num_seed_messages , seed_timeout_sec ) )
return seed_producer . acked
def get_messages_from_output_ topic ( self ) :
consumer = self . start_consumer ( self . output_ topic, group_id = " verifying_consumer " )
return self . drain_consumer ( consumer )
def get_messages_from_topic ( self , topic , num_messages ) :
consumer = self . start_consumer ( topic , group_id = " verifying_consumer " )
return self . drain_consumer ( consumer , num_messages )
def bounce_brokers ( self , clean_shutdown ) :
for node in self . kafka . nodes :
@ -107,16 +89,16 @@ class TransactionsTest(Test):
@@ -107,16 +89,16 @@ class TransactionsTest(Test):
hard - killed broker % s " % s tr(node.account))
self . kafka . start_node ( node )
def create_and_start_message_copier ( self , input_partition , transactional_id ) :
def create_and_start_message_copier ( self , input_topic , input_ partition , output_topic , transactional_id ) :
message_copier = TransactionalMessageCopier (
context = self . test_context ,
num_nodes = 1 ,
kafka = self . kafka ,
transactional_id = transactional_id ,
consumer_group = self . consumer_group ,
input_topic = self . input_topic ,
input_topic = input_topic ,
input_partition = input_partition ,
output_topic = self . output_topic ,
output_topic = output_topic ,
max_messages = - 1 ,
transaction_size = self . transaction_size
)
@ -137,16 +119,15 @@ class TransactionsTest(Test):
@@ -137,16 +119,15 @@ class TransactionsTest(Test):
str ( copier . progress_percent ( ) ) ) )
copier . restart ( clean_shutdown )
def create_and_start_copiers ( self ) :
def create_and_start_copiers ( self , input_topic , output_topic , num_copiers ) :
copiers = [ ]
copiers . append ( self . create_and_start_message_copier (
input_partition = 0 ,
transactional_id = self . first_transactional_id
) )
copiers . append ( self . create_and_start_message_copier (
input_partition = 1 ,
transactional_id = self . second_transactional_id
) )
for i in range ( 0 , num_copiers ) :
copiers . append ( self . create_and_start_message_copier (
input_topic = input_topic ,
output_topic = output_topic ,
input_partition = i ,
transactional_id = " copier- " + str ( i )
) )
return copiers
def start_consumer ( self , topic_to_read , group_id ) :
@ -167,7 +148,7 @@ class TransactionsTest(Test):
@@ -167,7 +148,7 @@ class TransactionsTest(Test):
60 )
return consumer
def drain_consumer ( self , consumer ) :
def drain_consumer ( self , consumer , num_messages ) :
# wait until we read at least the expected number of messages.
# This is a safe check because both failure modes will be caught:
# 1. If we have 'num_seed_messages' but there are duplicates, then
@ -175,14 +156,16 @@ class TransactionsTest(Test):
@@ -175,14 +156,16 @@ class TransactionsTest(Test):
#
# 2. If we never reach 'num_seed_messages', then this will cause the
# test to fail.
wait_until ( lambda : len ( consumer . messages_consumed [ 1 ] ) > = self . num_seed _messages ,
wait_until ( lambda : len ( consumer . messages_consumed [ 1 ] ) > = num_messages ,
timeout_sec = 90 ,
err_msg = " Consumer consumed only %d out of %d messages in %d s " % \
( len ( consumer . messages_consumed [ 1 ] ) , self . num_seed _messages , 90 ) )
( len ( consumer . messages_consumed [ 1 ] ) , num_messages , 90 ) )
consumer . stop ( )
return consumer . messages_consumed [ 1 ]
def copy_messages_transactionally ( self , failure_mode , bounce_target ) :
def copy_messages_transactionally ( self , failure_mode , bounce_target ,
input_topic , output_topic ,
num_copiers , num_messages_to_copy ) :
""" Copies messages transactionally from the seeded input topic to the
output topic , either bouncing brokers or clients in a hard and soft
way as it goes .
@ -192,8 +175,10 @@ class TransactionsTest(Test):
@@ -192,8 +175,10 @@ class TransactionsTest(Test):
It returns the concurrently consumed messages .
"""
copiers = self . create_and_start_copiers ( )
concurrent_consumer = self . start_consumer ( self . output_topic ,
copiers = self . create_and_start_copiers ( input_topic = input_topic ,
output_topic = output_topic ,
num_copiers = num_copiers )
concurrent_consumer = self . start_consumer ( output_topic ,
group_id = " concurrent_consumer " )
clean_shutdown = False
if failure_mode == " clean_bounce " :
@ -210,22 +195,57 @@ class TransactionsTest(Test):
@@ -210,22 +195,57 @@ class TransactionsTest(Test):
err_msg = " %s - Failed to copy all messages in %d s. " % \
( copier . transactional_id , 120 ) )
self . logger . info ( " finished copying messages " )
return self . drain_consumer ( concurrent_consumer )
return self . drain_consumer ( concurrent_consumer , num_messages_to_copy )
def setup_topics ( self ) :
self . kafka . topics = {
self . input_topic : {
" partitions " : self . num_input_partitions ,
" replication-factor " : 3 ,
" configs " : {
" min.insync.replicas " : 2
}
} ,
self . output_topic : {
" partitions " : self . num_output_partitions ,
" replication-factor " : 3 ,
" configs " : {
" min.insync.replicas " : 2
}
}
}
@cluster ( num_nodes = 9 )
@matrix ( failure_mode = [ " hard_bounce " , " clean_bounce " ] ,
bounce_target = [ " brokers " , " clients " ] )
def test_transactions ( self , failure_mode , bounce_target ) :
bounce_target = [ " brokers " , " clients " ] ,
check_order = [ True , False ] )
def test_transactions ( self , failure_mode , bounce_target , check_order ) :
security_protocol = ' PLAINTEXT '
self . kafka . security_protocol = security_protocol
self . kafka . interbroker_security_protocol = security_protocol
self . kafka . logs [ " kafka_data_1 " ] [ " collect_default " ] = True
self . kafka . logs [ " kafka_data_2 " ] [ " collect_default " ] = True
self . kafka . logs [ " kafka_operational_logs_debug " ] [ " collect_default " ] = True
if check_order :
# To check ordering, we simply create input and output topics
# with a single partition.
# We reduce the number of seed messages to copy to account for the fewer output
# partitions, and thus lower parallelism. This helps keep the test
# time shorter.
self . num_seed_messages = self . num_seed_messages / 3
self . num_input_partitions = 1
self . num_output_partitions = 1
self . setup_topics ( )
self . kafka . start ( )
input_messages = self . seed_messages ( )
concurrently_consumed_messages = self . copy_messages_transactionally ( failure_mode , bounce_target )
output_messages = self . get_messages_from_output_topic ( )
input_messages = self . seed_messages ( self . input_topic , self . num_seed_messages )
concurrently_consumed_messages = self . copy_messages_transactionally (
failure_mode , bounce_target , input_topic = self . input_topic ,
output_topic = self . output_topic , num_copiers = self . num_input_partitions ,
num_messages_to_copy = self . num_seed_messages )
output_messages = self . get_messages_from_topic ( self . output_topic , self . num_seed_messages )
concurrently_consumed_message_set = set ( concurrently_consumed_messages )
output_message_set = set ( output_messages )
@ -242,3 +262,7 @@ class TransactionsTest(Test):
@@ -242,3 +262,7 @@ class TransactionsTest(Test):
assert input_message_set == concurrently_consumed_message_set , \
" Input and concurrently consumed output message sets are not equal. Num input messages: %d . Num concurrently_consumed_messages: %d " % \
( len ( input_message_set ) , len ( concurrently_consumed_message_set ) )
if check_order :
assert input_messages == sorted ( input_messages ) , " The seed messages themselves were not in order "
assert output_messages == input_messages , " Output messages are not in order "
assert concurrently_consumed_messages == output_messages , " Concurrently consumed messages are not in order "