@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins ;
import org.apache.kafka.connect.transforms.Transformation ;
import java.lang.reflect.Modifier ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashMap ;
@ -37,6 +38,8 @@ import java.util.LinkedHashSet;
@@ -37,6 +38,8 @@ import java.util.LinkedHashSet;
import java.util.List ;
import java.util.Locale ;
import java.util.Map ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars ;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast ;
@ -329,11 +332,25 @@ public class ConnectorConfig extends AbstractConfig {
@@ -329,11 +332,25 @@ public class ConnectorConfig extends AbstractConfig {
if ( transformationCls = = null | | ! Transformation . class . isAssignableFrom ( transformationCls ) ) {
throw new ConfigException ( key , String . valueOf ( transformationCls ) , "Not a Transformation" ) ;
}
if ( Modifier . isAbstract ( transformationCls . getModifiers ( ) ) ) {
String childClassNames = Stream . of ( transformationCls . getClasses ( ) )
. filter ( transformationCls : : isAssignableFrom )
. filter ( c - > ! Modifier . isAbstract ( c . getModifiers ( ) ) )
. filter ( c - > Modifier . isPublic ( c . getModifiers ( ) ) )
. map ( Class : : getName )
. collect ( Collectors . joining ( ", " ) ) ;
String message = childClassNames . trim ( ) . isEmpty ( ) ?
"Transformation is abstract and cannot be created." :
"Transformation is abstract and cannot be created. Did you mean " + childClassNames + "?" ;
throw new ConfigException ( key , String . valueOf ( transformationCls ) , message ) ;
}
Transformation transformation ;
try {
transformation = transformationCls . asSubclass ( Transformation . class ) . getConstructor ( ) . newInstance ( ) ;
} catch ( Exception e ) {
throw new ConfigException ( key , String . valueOf ( transformationCls ) , "Error getting config definition from Transformation: " + e . getMessage ( ) ) ;
ConfigException exception = new ConfigException ( key , String . valueOf ( transformationCls ) , "Error getting config definition from Transformation: " + e . getMessage ( ) ) ;
exception . initCause ( e ) ;
throw exception ;
}
ConfigDef configDef = transformation . config ( ) ;
if ( null = = configDef ) {