@ -22,8 +22,8 @@ from ducktape.utils.util import wait_until
@@ -22,8 +22,8 @@ from ducktape.utils.util import wait_until
from kafkatest . directory_layout . kafka_path import KafkaPathResolverMixin
class StreamsSmoke TestBaseService ( KafkaPathResolverMixin , Service ) :
""" Base class for Streams Smoke Test services providing some common settings and functionality """
class StreamsTestBaseService ( KafkaPathResolverMixin , Service ) :
""" Base class for Streams Test services providing some common settings and functionality """
PERSISTENT_ROOT = " /mnt/streams "
# The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately
@ -45,10 +45,11 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
@@ -45,10 +45,11 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
" collect_default " : True } ,
}
def __init__ ( self , context , kafka , command ) :
super ( StreamsSmoke TestBaseService , self ) . __init__ ( context , 1 )
def __init__ ( self , test_ context, kafka , streams_class_name , user_test_args ) :
super ( StreamsTestBaseService , self ) . __init__ ( test_ context, 1 )
self . kafka = kafka
self . args = { ' command ' : command }
self . args = { ' streams_class_name ' : streams_class_name ,
' user_test_args ' : user_test_args }
@property
def node ( self ) :
@ -65,7 +66,7 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
@@ -65,7 +66,7 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
self . stop_node ( node , clean_shutdown )
def stop_node ( self , node , clean_shutdown = True ) :
self . logger . info ( ( clean_shutdown and " Cleanly " or " Forcibly " ) + " stopping Streams Smoke Test on " + str ( node . account ) )
self . logger . info ( ( clean_shutdown and " Cleanly " or " Forcibly " ) + " stopping Streams Test on " + str ( node . account ) )
pids = self . pids ( node )
sig = signal . SIGTERM if clean_shutdown else signal . SIGKILL
@ -73,7 +74,7 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
@@ -73,7 +74,7 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
node . account . signal ( pid , sig , allow_fail = True )
if clean_shutdown :
for pid in pids :
wait_until ( lambda : not node . account . alive ( pid ) , timeout_sec = 60 , err_msg = " Streams Smoke Test process on " + str ( node . account ) + " took too long to exit " )
wait_until ( lambda : not node . account . alive ( pid ) , timeout_sec = 60 , err_msg = " Streams Test process on " + str ( node . account ) + " took too long to exit " )
node . account . ssh ( " rm -f " + self . PID_FILE , allow_fail = False )
@ -95,8 +96,11 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
@@ -95,8 +96,11 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
def wait ( self , timeout_sec = 360 ) :
for node in self . nodes :
for pid in self . pids ( node ) :
wait_until ( lambda : not node . account . alive ( pid ) , timeout_sec = timeout_sec , err_msg = " Streams Smoke Test process on " + str ( node . account ) + " took too long to exit " )
self . wait_node ( node , timeout_sec )
def wait_node ( self , node , timeout_sec = None ) :
for pid in self . pids ( node ) :
wait_until ( lambda : not node . account . alive ( pid ) , timeout_sec = timeout_sec , err_msg = " Streams Test process on " + str ( node . account ) + " took too long to exit " )
def clean_node ( self , node ) :
node . account . kill_process ( " streams " , clean_shutdown = False , allow_fail = True )
@ -105,7 +109,6 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
@@ -105,7 +109,6 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
def start_cmd ( self , node ) :
args = self . args . copy ( )
args [ ' kafka ' ] = self . kafka . bootstrap_servers ( )
args [ ' zk ' ] = self . kafka . zk . connect_setting ( )
args [ ' state_dir ' ] = self . PERSISTENT_ROOT
args [ ' stdout ' ] = self . STDOUT_FILE
args [ ' stderr ' ] = self . STDERR_FILE
@ -114,8 +117,8 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
@@ -114,8 +117,8 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
args [ ' kafka_run_class ' ] = self . path . script ( " kafka-run-class.sh " , node )
cmd = " ( export KAFKA_LOG4J_OPTS= \" -Dlog4j.configuration=file: %(log4j)s \" ; " \
" INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.smoketest.StreamsSmokeTest " \
" %(command)s %( kafka)s %(zk )s %(state_dir)s " \
" INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(state_dir)s %(user_test_args)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s " % args
return cmd
@ -125,24 +128,35 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
@@ -125,24 +128,35 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
node . account . create_file ( self . LOG4J_CONFIG_FILE , self . render ( ' tools_log4j.properties ' , log_file = self . LOG_FILE ) )
self . logger . info ( " Starting StreamsSmoke Test process on " + str ( node . account ) )
self . logger . info ( " Starting StreamsTest process on " + str ( node . account ) )
with node . account . monitor_log ( self . STDOUT_FILE ) as monitor :
node . account . ssh ( self . start_cmd ( node ) )
monitor . wait_until ( ' StreamsSmoke Test instance started ' , timeout_sec = 15 , err_msg = " Never saw message indicating StreamsSmoke Test finished startup on " + str ( node . account ) )
monitor . wait_until ( ' StreamsTest instance started ' , timeout_sec = 60 , err_msg = " Never saw message indicating StreamsTest finished startup on " + str ( node . account ) )
if len ( self . pids ( node ) ) == 0 :
raise RuntimeError ( " No process ids recorded " )
class StreamsSmokeTestBaseService ( StreamsTestBaseService ) :
""" Base class for Streams Smoke Test services providing some common settings and functionality """
def __init__ ( self , test_context , kafka , command ) :
super ( StreamsSmokeTestBaseService , self ) . __init__ ( test_context ,
kafka ,
" org.apache.kafka.streams.smoketest.StreamsSmokeTest " ,
command )
class StreamsSmokeTestDriverService ( StreamsSmokeTestBaseService ) :
def __init__ ( self , context , kafka ) :
super ( StreamsSmokeTestDriverService , self ) . __init__ ( context , kafka , " run " )
def __init__ ( self , test_ context, kafka ) :
super ( StreamsSmokeTestDriverService , self ) . __init__ ( test_ context, kafka , " run " )
class StreamsSmokeTestJobRunnerService ( StreamsSmokeTestBaseService ) :
def __init__ ( self , context , kafka ) :
super ( StreamsSmokeTestJobRunnerService , self ) . __init__ ( context , kafka , " process " )
def __init__ ( self , test_context , kafka ) :
super ( StreamsSmokeTestJobRunnerService , self ) . __init__ ( test_context , kafka , " process " )
class StreamsSmokeTestShutdownDeadlockService ( StreamsSmokeTestBaseService ) :
def __init__ ( self , context , kafka ) :
super ( StreamsSmokeTestShutdownDeadlockService , self ) . __init__ ( context , kafka , " close-deadlock-test " )
def __init__ ( self , test_ context, kafka ) :
super ( StreamsSmokeTestShutdownDeadlockService , self ) . __init__ ( test_ context, kafka , " close-deadlock-test " )