diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index cce1573a30d..1cf93a4fd64 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.transforms.Transformation; +import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -37,6 +38,8 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; @@ -329,11 +332,25 @@ public class ConnectorConfig extends AbstractConfig { if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) { throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation"); } + if (Modifier.isAbstract(transformationCls.getModifiers())) { + String childClassNames = Stream.of(transformationCls.getClasses()) + .filter(transformationCls::isAssignableFrom) + .filter(c -> !Modifier.isAbstract(c.getModifiers())) + .filter(c -> Modifier.isPublic(c.getModifiers())) + .map(Class::getName) + .collect(Collectors.joining(", ")); + String message = childClassNames.trim().isEmpty() ? + "Transformation is abstract and cannot be created." : + "Transformation is abstract and cannot be created. Did you mean " + childClassNames + "?"; + throw new ConfigException(key, String.valueOf(transformationCls), message); + } Transformation transformation; try { transformation = transformationCls.asSubclass(Transformation.class).getConstructor().newInstance(); } catch (Exception e) { - throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage()); + ConfigException exception = new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage()); + exception.initCause(e); + throw exception; } ConfigDef configDef = transformation.config(); if (null == configDef) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java index fe1bf264654..f674a8e22f2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java @@ -177,4 +177,77 @@ public class ConnectorConfigTest> { assertEquals(84, ((SimpleTransformation) transformations.get(1)).magicNumber); } + @Test + public void abstractTransform() { + Map props = new HashMap<>(); + props.put("name", "test"); + props.put("connector.class", TestConnector.class.getName()); + props.put("transforms", "a"); + props.put("transforms.a.type", AbstractTransformation.class.getName()); + try { + new ConnectorConfig(MOCK_PLUGINS, props); + } catch (ConfigException ex) { + assertTrue( + ex.getMessage().contains("Transformation is abstract and cannot be created.") + ); + } + } + @Test + public void abstractKeyValueTransform() { + Map props = new HashMap<>(); + props.put("name", "test"); + props.put("connector.class", TestConnector.class.getName()); + props.put("transforms", "a"); + props.put("transforms.a.type", AbstractKeyValueTransformation.class.getName()); + try { + new ConnectorConfig(MOCK_PLUGINS, props); + } catch (ConfigException ex) { + assertTrue( + ex.getMessage().contains("Transformation is abstract and cannot be created.") + ); + assertTrue( + ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName()) + ); + assertTrue( + ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName()) + ); + } + } + + public static abstract class AbstractTransformation> implements Transformation { + + } + + public static abstract class AbstractKeyValueTransformation> implements Transformation { + @Override + public R apply(R record) { + return null; + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } + + + public static class Key extends AbstractKeyValueTransformation { + + + } + public static class Value extends AbstractKeyValueTransformation { + + } + } + + }