From b6effcbba50eaa3abf91e24f0ae83abb53881add Mon Sep 17 00:00:00 2001 From: dan norwood Date: Thu, 11 May 2017 18:16:25 -0700 Subject: [PATCH] KAFKA-4343: Expose Connector type in REST API (KIP-151) https://cwiki.apache.org/confluence/display/KAFKA/KIP-151+Expose+Connector+type+in+REST+API Author: dan norwood Reviewers: Konstantine Karantasis , Ewen Cheslack-Postava Closes #2960 from norwood/KIP-151 --- .../connect/runtime/PluginDiscovery.java | 4 +- .../rest/entities/ConnectorPluginInfo.java | 77 +++++++++-- .../runtime/rest/entities/ConnectorType.java | 52 ++++++++ .../rest/entities/ConnectorTypeTest.java | 44 ++++++ .../ConnectorPluginsResourceTest.java | 126 ++++++++++++++++-- 5 files changed, 283 insertions(+), 20 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java index d31ce6d1d7a..482139a2792 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java @@ -78,14 +78,14 @@ public class PluginDiscovery { final List connectorPlugins = new ArrayList<>(connectorClasses.size()); for (Class connectorClass : connectorClasses) { if (isConcrete(connectorClass)) { - connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName())); + connectorPlugins.add(new ConnectorPluginInfo(connectorClass)); } } Collections.sort(connectorPlugins, new Comparator() { @Override public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) { - return a.clazz().compareTo(b.clazz()); + return a.className().compareTo(b.className()); } }); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java index 47e05bc6bcc..ff3c30ddd29 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java @@ -19,20 +19,74 @@ package org.apache.kafka.connect.runtime.rest.entities; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.connect.connector.Connector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; public class ConnectorPluginInfo { - private String clazz; + private static final Logger log = LoggerFactory.getLogger(ConnectorPluginInfo.class); + + private static final Map, String> + VERSIONS = new ConcurrentHashMap<>(); + + private String className; + private ConnectorType type; + private String version; @JsonCreator - public ConnectorPluginInfo(@JsonProperty("class") String clazz) { - this.clazz = clazz; + public ConnectorPluginInfo( + @JsonProperty("class") String className, + @JsonProperty("type") ConnectorType type, + @JsonProperty("version") String version + ) { + this.className = className; + this.type = type; + this.version = version; + } + + public ConnectorPluginInfo(Class klass) { + this(klass.getCanonicalName(), ConnectorType.from(klass), getVersion(klass)); + } + + private static String getVersion(Class klass) { + if (!VERSIONS.containsKey(klass)) { + synchronized (VERSIONS) { + if (!VERSIONS.containsKey(klass)) { + try { + VERSIONS.put(klass, klass.newInstance().version()); + } catch ( + ExceptionInInitializerError + | InstantiationException + | IllegalAccessException + | SecurityException e + ) { + log.warn("Unable to instantiate connector", e); + VERSIONS.put(klass, "unknown"); + } + } + } + } + return VERSIONS.get(klass); } @JsonProperty("class") - public String clazz() { - return clazz; + public String className() { + return className; + } + + @JsonProperty("type") + public ConnectorType type() { + return type; + } + + @JsonProperty("version") + public String version() { + return version; } @Override @@ -44,16 +98,23 @@ public class ConnectorPluginInfo { return false; } ConnectorPluginInfo that = (ConnectorPluginInfo) o; - return Objects.equals(clazz, that.clazz); + return Objects.equals(className, that.className) && + type == that.type && + Objects.equals(version, that.version); } @Override public int hashCode() { - return Objects.hash(clazz); + return Objects.hash(className, type, version); } @Override public String toString() { - return clazz; + final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{"); + sb.append("className='").append(className).append('\''); + sb.append(", type=").append(type); + sb.append(", version='").append(version).append('\''); + sb.append('}'); + return sb.toString(); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java new file mode 100644 index 00000000000..292a1ee0341 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.Locale; + +public enum ConnectorType { + SOURCE, SINK, UNKNOWN; + + public static ConnectorType from(Class clazz) { + if (SinkConnector.class.isAssignableFrom(clazz)) { + return SINK; + } + if (SourceConnector.class.isAssignableFrom(clazz)) { + return SOURCE; + } + + return UNKNOWN; + } + + @Override + @JsonValue + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } + + @JsonCreator + public static ConnectorType forValue(String value) { + return ConnectorType.valueOf(value.toUpperCase(Locale.ROOT)); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java new file mode 100644 index 00000000000..cd07bd85573 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ConnectorTypeTest { + + @Test + public void testToStringIsLowerCase() { + for (ConnectorType ct : ConnectorType.values()) { + String shouldBeLower = ct.toString(); + assertFalse(shouldBeLower.isEmpty()); + for (Character c : shouldBeLower.toCharArray()) { + assertTrue(Character.isLowerCase(c)); + } + } + } + + @Test + public void testForValue() { + for (ConnectorType ct : ConnectorType.values()) { + assertEquals(ct, ConnectorType.forValue(ct.toString())); + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 7ba6fd20154..966098c24b3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime.rest.resources; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; @@ -36,6 +37,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.tools.MockConnector; @@ -55,6 +57,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -358,18 +361,121 @@ public class ConnectorPluginsResourceTest { @Test public void testListConnectorPlugins() { Set connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins()); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class.getCanonicalName()))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class.getCanonicalName()))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class.getCanonicalName()))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName()))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName()))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class.getCanonicalName()))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class.getCanonicalName()))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class.getCanonicalName()))); - assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class.getCanonicalName()))); - assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName()))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class))); + assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class))); } + @Test + public void testConnectorPluginsIncludesTypeAndVersionInformation() + throws IOException { + ConnectorPluginInfo sinkInfo = new ConnectorPluginInfo(TestSinkConnector.class); + ConnectorPluginInfo sourceInfo = new ConnectorPluginInfo(TestSourceConnector.class); + ConnectorPluginInfo unkownInfo = + new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class); + assertEquals(ConnectorType.SINK, sinkInfo.type()); + assertEquals(ConnectorType.SOURCE, sourceInfo.type()); + assertEquals(ConnectorType.UNKNOWN, unkownInfo.type()); + assertEquals(TestSinkConnector.VERSION, sinkInfo.version()); + assertEquals(TestSourceConnector.VERSION, sourceInfo.version()); + + final ObjectMapper objectMapper = new ObjectMapper(); + String serializedSink = objectMapper.writeValueAsString(ConnectorType.SINK); + String serializedSource = objectMapper.writeValueAsString(ConnectorType.SOURCE); + String serializedUnknown = objectMapper.writeValueAsString(ConnectorType.UNKNOWN); + assertTrue(serializedSink.contains("sink")); + assertTrue(serializedSource.contains("source")); + assertTrue(serializedUnknown.contains("unknown")); + assertEquals( + ConnectorType.SINK, + objectMapper.readValue(serializedSink, ConnectorType.class) + ); + assertEquals( + ConnectorType.SOURCE, + objectMapper.readValue(serializedSource, ConnectorType.class) + ); + assertEquals( + ConnectorType.UNKNOWN, + objectMapper.readValue(serializedUnknown, ConnectorType.class) + ); + } + + public static class TestSinkConnector extends SinkConnector { + + static final String VERSION = "some great version"; + + @Override + public String version() { + return VERSION; + } + + @Override + public void start(Map props) { + + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + + } + + @Override + public ConfigDef config() { + return null; + } + } + + public static class TestSourceConnector extends SourceConnector { + + static final String VERSION = "an entirely different version"; + + @Override + public String version() { + return VERSION; + } + + @Override + public void start(Map props) { + + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + + } + + @Override + public ConfigDef config() { + return null; + } + } /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class ConnectorPluginsResourceTestConnector extends Connector {