@ -285,26 +285,31 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
@@ -285,26 +285,31 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
env_opts = " \" %s %s \" " % ( env_opts . strip ( ' \" ' ) , value )
self . environment [ envvar ] = env_opts
def append_filestream_connectors_to_classpath ( self ) :
def maybe_ append_filestream_connectors_to_classpath( self ) :
if self . include_filestream_connectors :
cwd = os . getcwd ( )
self . logger . info ( " Including filestream connectors when starting Connect. "
" Looking for jar locally in: %s " % cwd )
relative_path = " /connect/file/build/libs/ "
local_dir = cwd + relative_path
lib_dir = self . path . home ( ) + relative_path
for pwd , dirs , files in os . walk ( local_dir ) :
for file in files :
if file . startswith ( " connect-file " ) and file . endswith ( " .jar " ) :
# Use the expected directory on the node instead of the path in the driver node
file_path = lib_dir + file
self . logger . debug ( " Appending %s to Connect worker ' s CLASSPATH " % file_path )
return " export CLASSPATH=$ {CLASSPATH} : %s ; " % file_path
self . logger . info ( " Jar with filestream connectors was not found under %s " % lib_dir )
return self . append_module_to_classpath ( " file " )
else :
self . logger . info ( " Starting Connect without filestream connectors in the CLASSPATH " )
return " "
return None
def append_test_plugins_to_classpath ( self ) :
return self . append_module_to_classpath ( " test-plugins " )
def append_module_to_classpath ( self , module ) :
cwd = os . getcwd ( )
relative_path = " /connect/ " + module + " /build/libs/ "
local_dir = cwd + relative_path
lib_dir = self . path . home ( ) + relative_path
for pwd , dirs , files in os . walk ( local_dir ) :
for file in files :
if file . endswith ( " .jar " ) :
# Use the expected directory on the node instead of the path in the driver node
file_path = lib_dir + file
self . logger . info ( " Appending %s to Connect worker ' s CLASSPATH " % file_path )
return " export CLASSPATH=$ {CLASSPATH} : %s ; " % file_path
self . logger . info ( " Jar not found within %s " % local_dir )
return " "
class ConnectStandaloneService ( ConnectServiceBase ) :
@ -327,8 +332,8 @@ class ConnectStandaloneService(ConnectServiceBase):
@@ -327,8 +332,8 @@ class ConnectStandaloneService(ConnectServiceBase):
cmd + = fix_opts_for_new_jvm ( node )
cmd + = " export KAFKA_OPTS= \" %s %s \" ; " % ( heap_kafka_opts , other_kafka_opts )
classpath = self . append_filestream_connector s_to_classpath ( )
cmd + = classpath if classpath else " "
cmd + = self . append_test_plugin s_to_classpath ( )
cmd + = self . maybe_append_filestream_connectors_to_classpath ( )
for envvar in self . environment :
cmd + = " export %s = %s ; " % ( envvar , str ( self . environment [ envvar ] ) )
@ -388,8 +393,8 @@ class ConnectDistributedService(ConnectServiceBase):
@@ -388,8 +393,8 @@ class ConnectDistributedService(ConnectServiceBase):
for envvar in self . environment :
cmd + = " export %s = %s ; " % ( envvar , str ( self . environment [ envvar ] ) )
classpath = self . append_filestream_connectors_to_classpath ( )
cmd + = classpath if classpath else " "
cmd + = self . maybe_ append_filestream_connectors_to_classpath( )
cmd + = self . append_test_plugins_to_classpath ( )
cmd + = " %s %s " % ( self . path . script ( " connect-distributed.sh " , node ) , self . CONFIG_FILE )
cmd + = " & echo $! >&3 ) 1>> %s 2>> %s 3> %s " % ( self . STDOUT_FILE , self . STDERR_FILE , self . PID_FILE )
return cmd