diff --git a/build.gradle b/build.gradle index 6bb0363fbf0..75db84a62dd 100644 --- a/build.gradle +++ b/build.gradle @@ -36,6 +36,7 @@ def powermock_easymock='org.powermock:powermock-api-easymock:1.6.3' def jackson_version = '2.6.3' def jetty_version = '9.2.14.v20151106' def jersey_version = '2.22.1' +def reflections_version = '0.9.10' allprojects { apply plugin: 'idea' @@ -790,6 +791,7 @@ project(':connect:runtime') { compile "org.eclipse.jetty:jetty-servlet:$jetty_version" compile "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$jackson_version" compile "org.glassfish.jersey.containers:jersey-container-servlet:$jersey_version" + compile "org.reflections:reflections:$reflections_version" testCompile "$junit" testCompile "$easymock" diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a65a2dc3ad0..a663cf7c5aa 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -171,6 +171,8 @@ + + diff --git a/config/connect-file-sink.properties b/config/connect-file-sink.properties index e2cf361e2bf..594ccc6e953 100644 --- a/config/connect-file-sink.properties +++ b/config/connect-file-sink.properties @@ -14,7 +14,7 @@ # limitations under the License. name=local-file-sink -connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector +connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test \ No newline at end of file diff --git a/config/connect-file-source.properties b/config/connect-file-source.properties index df92d44af66..599cf4cb2ac 100644 --- a/config/connect-file-source.properties +++ b/config/connect-file-source.properties @@ -14,7 +14,7 @@ # limitations under the License. name=local-file-source -connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector +connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test \ No newline at end of file diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 77cfc8dc919..4824acdc507 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -43,7 +43,9 @@ public class ConnectorConfig extends AbstractConfig { public static final String CONNECTOR_CLASS_CONFIG = "connector.class"; private static final String CONNECTOR_CLASS_DOC = - "Name of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector"; + "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. " + + "If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, " + + " or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter"; public static final String TASKS_MAX_CONFIG = "tasks.max"; private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; @@ -58,7 +60,7 @@ public class ConnectorConfig extends AbstractConfig { static { config = new ConfigDef() .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC) - .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC) + .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC) .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC) .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 3898ad60aac..4766cf7728b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; @@ -35,15 +36,20 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.util.ConnectorTaskId; +import org.reflections.Reflections; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; + /** *

* Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving @@ -170,15 +176,9 @@ public class Worker { */ public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) { String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); - Class maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - log.info("Creating connector {} of type {}", connName, maybeConnClass.getName()); + Class connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); - Class connClass; - try { - connClass = maybeConnClass.asSubclass(Connector.class); - } catch (ClassCastException e) { - throw new ConnectException("Specified class is not a subclass of Connector: " + maybeConnClass.getName()); - } + log.info("Creating connector {} of type {}", connName, connClass.getName()); if (connectors.containsKey(connName)) throw new ConnectException("Connector with name " + connName + " already exists"); @@ -197,6 +197,54 @@ public class Worker { log.info("Finished creating connector {}", connName); } + /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */ + public boolean isSinkConnector(String connName) { + return SinkConnector.class.isAssignableFrom(connectors.get(connName).getClass()); + } + + + // Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration + private Class getConnectorClass(String connectorAlias) { + Reflections reflections = new Reflections(new ConfigurationBuilder() + .setUrls(ClasspathHelper.forJavaClassPath())); + + Set> connectors = reflections.getSubTypesOf(Connector.class); + + List> results = new ArrayList<>(); + + for (Class connector: connectors) { + // Configuration included the fully qualified class name + if (connector.getName().equals(connectorAlias)) + results.add(connector); + + // Configuration included the class name but not package + if (connector.getSimpleName().equals(connectorAlias)) + results.add(connector); + + // Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector) + if (connector.getSimpleName().equals(connectorAlias + "Connector")) + results.add(connector); + } + + if (results.isEmpty()) + throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorAlias + " available connectors are: " + connectorNames(connectors)); + if (results.size() > 1) { + throw new ConnectException("More than one connector matches alias " + connectorAlias + ". Please use full package + class name instead. Classes found: " + connectorNames(results)); + } + + // We just validated that we have exactly one result, so this is safe + return results.get(0); + } + + private String connectorNames(Collection> connectors) { + StringBuilder names = new StringBuilder(); + for (Class c : connectors) + names.append(c.getName()).append(", "); + + return names.substring(0, names.toString().length() - 2); + } + + private static Connector instantiateConnector(Class connClass) { try { return Utils.newInstance(connClass); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 7caaabb2439..a9e8dd59b8b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -34,7 +34,6 @@ import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; -import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.KafkaConfigStorage; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -709,9 +708,8 @@ public class DistributedHerder implements Herder, Runnable { try { Map configs = configState.connectorConfig(connName); ConnectorConfig connConfig = new ConnectorConfig(configs); - List sinkTopics = null; - if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG))) + if (worker.isSinkConnector(connName)) sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); final List> taskProps diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 4e25e9d88d9..335e0ce7e74 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -91,14 +91,14 @@ public class WorkerTest extends ThreadedTest { ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); PowerMock.mockStatic(Worker.class); - PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector); + PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector); EasyMock.expect(connector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); - props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); connector.initialize(ctx); EasyMock.expectLastCall(); @@ -135,6 +135,110 @@ public class WorkerTest extends ThreadedTest { PowerMock.verifyAll(); } + @Test + public void testAddConnectorByAlias() throws Exception { + offsetBackingStore.configure(EasyMock.anyObject(Map.class)); + EasyMock.expectLastCall(); + offsetBackingStore.start(); + EasyMock.expectLastCall(); + + // Create + Connector connector = PowerMock.createMock(Connector.class); + ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); + + PowerMock.mockStatic(Worker.class); + PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector); + EasyMock.expect(connector.version()).andReturn("1.0"); + + Map props = new HashMap<>(); + props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector"); + + connector.initialize(ctx); + EasyMock.expectLastCall(); + connector.start(props); + EasyMock.expectLastCall(); + + // Remove + connector.stop(); + EasyMock.expectLastCall(); + + offsetBackingStore.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + + worker = new Worker(new MockTime(), config, offsetBackingStore); + worker.start(); + + ConnectorConfig config = new ConnectorConfig(props); + assertEquals(Collections.emptySet(), worker.connectorNames()); + worker.addConnector(config, ctx); + assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); + + worker.stopConnector(CONNECTOR_ID); + assertEquals(Collections.emptySet(), worker.connectorNames()); + // Nothing should be left, so this should effectively be a nop + worker.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testAddConnectorByShortAlias() throws Exception { + offsetBackingStore.configure(EasyMock.anyObject(Map.class)); + EasyMock.expectLastCall(); + offsetBackingStore.start(); + EasyMock.expectLastCall(); + + // Create + Connector connector = PowerMock.createMock(Connector.class); + ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); + + PowerMock.mockStatic(Worker.class); + PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector); + EasyMock.expect(connector.version()).andReturn("1.0"); + + Map props = new HashMap<>(); + props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest"); + + connector.initialize(ctx); + EasyMock.expectLastCall(); + connector.start(props); + EasyMock.expectLastCall(); + + // Remove + connector.stop(); + EasyMock.expectLastCall(); + + offsetBackingStore.stop(); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + worker = new Worker(new MockTime(), config, offsetBackingStore); + worker.start(); + + ConnectorConfig config = new ConnectorConfig(props); + assertEquals(Collections.emptySet(), worker.connectorNames()); + worker.addConnector(config, ctx); + assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); + + worker.stopConnector(CONNECTOR_ID); + assertEquals(Collections.emptySet(), worker.connectorNames()); + // Nothing should be left, so this should effectively be a nop + worker.stop(); + + PowerMock.verifyAll(); + } + + @Test(expected = ConnectException.class) public void testStopInvalidConnector() { offsetBackingStore.configure(EasyMock.anyObject(Map.class)); @@ -162,14 +266,14 @@ public class WorkerTest extends ThreadedTest { ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); PowerMock.mockStatic(Worker.class); - PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector); + PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector); EasyMock.expect(connector.version()).andReturn("1.0"); Map props = new HashMap<>(); props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); - props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); connector.initialize(ctx); EasyMock.expectLastCall(); @@ -345,7 +449,8 @@ public class WorkerTest extends ThreadedTest { } - private static class TestConnector extends Connector { + /* Name here needs to be unique as we are testing the aliasing mechanism */ + private static class WorkerTestConnector extends Connector { @Override public String version() { return "1.0"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 36f8fced2b2..76f9bc05bf7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -136,6 +136,7 @@ public class DistributedHerderTest { @Before public void setUp() throws Exception { worker = PowerMock.createMock(Worker.class); + EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE); time = new MockTime(); herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"}, @@ -487,6 +488,7 @@ public class DistributedHerderTest { worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.anyObject()); PowerMock.expectLastCall(); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); diff --git a/tests/kafkatest/tests/templates/connect-file-sink.properties b/tests/kafkatest/tests/templates/connect-file-sink.properties index f52c26e3c6f..ad78bb366db 100644 --- a/tests/kafkatest/tests/templates/connect-file-sink.properties +++ b/tests/kafkatest/tests/templates/connect-file-sink.properties @@ -14,7 +14,7 @@ # limitations under the License. name=local-file-sink -connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector +connector.class=FileStreamSink tasks.max=1 file={{ OUTPUT_FILE }} topics={{ TOPIC }} \ No newline at end of file diff --git a/tests/kafkatest/tests/templates/connect-file-source.properties b/tests/kafkatest/tests/templates/connect-file-source.properties index e8a6f05c995..d2d5e974766 100644 --- a/tests/kafkatest/tests/templates/connect-file-source.properties +++ b/tests/kafkatest/tests/templates/connect-file-source.properties @@ -14,7 +14,7 @@ # limitations under the License. name=local-file-source -connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector +connector.class=FileStreamSource tasks.max=1 file={{ INPUT_FILE }} topic={{ TOPIC }} \ No newline at end of file