@ -20,6 +20,7 @@ import signal
@@ -20,6 +20,7 @@ import signal
import time
import requests
from ducktape . cluster . remoteaccount import RemoteCommandError
from ducktape . errors import DucktapeError
from ducktape . services . service import Service
from ducktape . utils . util import wait_until
@ -39,6 +40,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
@@ -39,6 +40,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
STDERR_FILE = os . path . join ( PERSISTENT_ROOT , " connect.stderr " )
LOG4J_CONFIG_FILE = os . path . join ( PERSISTENT_ROOT , " connect-log4j.properties " )
PID_FILE = os . path . join ( PERSISTENT_ROOT , " connect.pid " )
CONNECT_REST_PORT = 8083
# Currently the Connect worker supports waiting on three modes:
STARTUP_MODE_INSTANT = ' INSTANT '
""" STARTUP_MODE_INSTANT: Start Connect worker and return immediately """
STARTUP_MODE_LOAD = ' LOAD '
""" STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins """
STARTUP_MODE_LISTEN = ' LISTEN '
""" STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port. """
logs = {
" connect_log " : {
@ -57,6 +67,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
@@ -57,6 +67,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
self . kafka = kafka
self . security_config = kafka . security_config . client_config ( )
self . files = files
self . startup_mode = self . STARTUP_MODE_LISTEN
self . environment = { }
def pids ( self , node ) :
@ -76,6 +87,38 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
@@ -76,6 +87,38 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
self . config_template_func = config_template_func
self . connector_config_templates = connector_config_templates
def listening ( self , node ) :
try :
cmd = " nc -z %s %s " % ( node . account . hostname , self . CONNECT_REST_PORT )
node . account . ssh_output ( cmd , allow_fail = False )
self . logger . debug ( " Connect worker started accepting connections at: ' %s : %s ' ) " , node . account . hostname ,
self . CONNECT_REST_PORT )
return True
except ( RemoteCommandError , ValueError ) as e :
return False
def start ( self , mode = STARTUP_MODE_LISTEN ) :
self . startup_mode = mode
super ( ConnectServiceBase , self ) . start ( )
def start_and_return_immediately ( self , node , worker_type , remote_connector_configs ) :
cmd = self . start_cmd ( node , remote_connector_configs )
self . logger . debug ( " Connect %s command: %s " , worker_type , cmd )
node . account . ssh ( cmd )
def start_and_wait_to_load_plugins ( self , node , worker_type , remote_connector_configs ) :
with node . account . monitor_log ( self . LOG_FILE ) as monitor :
self . start_and_return_immediately ( node , worker_type , remote_connector_configs )
monitor . wait_until ( ' Kafka version ' , timeout_sec = 60 ,
err_msg = " Never saw message indicating Kafka Connect finished startup on node: " +
" %s in condition mode: %s " % ( str ( node . account ) , self . startup_mode ) )
def start_and_wait_to_start_listening ( self , node , worker_type , remote_connector_configs ) :
self . start_and_return_immediately ( node , worker_type , remote_connector_configs )
wait_until ( lambda : self . listening ( node ) , timeout_sec = 60 ,
err_msg = " Kafka Connect failed to start on node: %s in condition mode: %s " %
( str ( node . account ) , self . startup_mode ) )
def stop_node ( self , node , clean_shutdown = True ) :
self . logger . info ( ( clean_shutdown and " Cleanly " or " Forcibly " ) + " stopping Kafka Connect on " + str ( node . account ) )
pids = self . pids ( node )
@ -192,7 +235,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
@@ -192,7 +235,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
raise exception_to_throw
def _base_url ( self , node ) :
return ' http:// ' + node . account . externally_routable_ip + ' : ' + ' 8083 '
return ' http:// ' + node . account . externally_routable_ip + ' : ' + str ( self . CONNECT_REST_PORT )
class ConnectStandaloneService ( ConnectServiceBase ) :
@ -229,11 +272,13 @@ class ConnectStandaloneService(ConnectServiceBase):
@@ -229,11 +272,13 @@ class ConnectStandaloneService(ConnectServiceBase):
remote_connector_configs . append ( target_file )
self . logger . info ( " Starting Kafka Connect standalone process on " + str ( node . account ) )
with node . account . monitor_log ( self . LOG_FILE ) as monitor :
cmd = self . start_cmd ( node , remote_connector_configs )
self . logger . debug ( " Connect standalone command: %s " , cmd )
node . account . ssh ( cmd )
monitor . wait_until ( ' Kafka Connect started ' , timeout_sec = 60 , err_msg = " Never saw message indicating Kafka Connect finished startup on " + str ( node . account ) )
if self . startup_mode == self . STARTUP_MODE_LOAD :
self . start_and_wait_to_load_plugins ( node , ' standalone ' , remote_connector_configs )
elif self . startup_mode == self . STARTUP_MODE_INSTANT :
self . start_and_return_immediately ( node , ' standalone ' , remote_connector_configs )
else :
# The default mode is to wait until the complete startup of the worker
self . start_and_wait_to_start_listening ( node , ' standalone ' , remote_connector_configs )
if len ( self . pids ( node ) ) == 0 :
raise RuntimeError ( " No process ids recorded " )
@ -249,7 +294,8 @@ class ConnectDistributedService(ConnectServiceBase):
@@ -249,7 +294,8 @@ class ConnectDistributedService(ConnectServiceBase):
self . configs_topic = configs_topic
self . status_topic = status_topic
def start_cmd ( self , node ) :
# connector_configs argument is intentionally ignored in distributed service.
def start_cmd ( self , node , connector_configs ) :
cmd = " ( export KAFKA_LOG4J_OPTS= \" -Dlog4j.configuration=file: %s \" ; " % self . LOG4J_CONFIG_FILE
cmd + = " export KAFKA_OPTS= %s ; " % self . security_config . kafka_opts
for envvar in self . environment :
@ -268,11 +314,13 @@ class ConnectDistributedService(ConnectServiceBase):
@@ -268,11 +314,13 @@ class ConnectDistributedService(ConnectServiceBase):
raise DucktapeError ( " Config files are not valid in distributed mode, submit connectors via the REST API " )
self . logger . info ( " Starting Kafka Connect distributed process on " + str ( node . account ) )
with node . account . monitor_log ( self . LOG_FILE ) as monitor :
cmd = self . start_cmd ( node )
self . logger . debug ( " Connect distributed command: %s " , cmd )
node . account . ssh ( cmd )
monitor . wait_until ( ' Kafka Connect started ' , timeout_sec = 60 , err_msg = " Never saw message indicating Kafka Connect finished startup on " + str ( node . account ) )
if self . startup_mode == self . STARTUP_MODE_LOAD :
self . start_and_wait_to_load_plugins ( node , ' distributed ' , ' ' )
elif self . startup_mode == self . STARTUP_MODE_INSTANT :
self . start_and_return_immediately ( node , ' distributed ' , ' ' )
else :
# The default mode is to wait until the complete startup of the worker
self . start_and_wait_to_start_listening ( node , ' distributed ' , ' ' )
if len ( self . pids ( node ) ) == 0 :
raise RuntimeError ( " No process ids recorded " )