Browse Source

KAFKA-4343: Expose Connector type in REST API (KIP-151)

https://cwiki.apache.org/confluence/display/KAFKA/KIP-151+Expose+Connector+type+in+REST+API

Author: dan norwood <norwood@confluent.io>

Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2960 from norwood/KIP-151
pull/2960/merge
dan norwood 8 years ago committed by Ewen Cheslack-Postava
parent
commit
b6effcbba5
  1. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
  2. 77
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
  3. 52
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java
  4. 44
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java
  5. 126
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java

4
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java

@ -78,14 +78,14 @@ public class PluginDiscovery { @@ -78,14 +78,14 @@ public class PluginDiscovery {
final List<ConnectorPluginInfo> connectorPlugins = new ArrayList<>(connectorClasses.size());
for (Class<? extends Connector> connectorClass : connectorClasses) {
if (isConcrete(connectorClass)) {
connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
connectorPlugins.add(new ConnectorPluginInfo(connectorClass));
}
}
Collections.sort(connectorPlugins, new Comparator<ConnectorPluginInfo>() {
@Override
public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) {
return a.clazz().compareTo(b.clazz());
return a.className().compareTo(b.className());
}
});

77
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java

@ -19,20 +19,74 @@ package org.apache.kafka.connect.runtime.rest.entities; @@ -19,20 +19,74 @@ package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.connect.connector.Connector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public class ConnectorPluginInfo {
private String clazz;
private static final Logger log = LoggerFactory.getLogger(ConnectorPluginInfo.class);
private static final Map<Class<? extends Connector>, String>
VERSIONS = new ConcurrentHashMap<>();
private String className;
private ConnectorType type;
private String version;
@JsonCreator
public ConnectorPluginInfo(@JsonProperty("class") String clazz) {
this.clazz = clazz;
public ConnectorPluginInfo(
@JsonProperty("class") String className,
@JsonProperty("type") ConnectorType type,
@JsonProperty("version") String version
) {
this.className = className;
this.type = type;
this.version = version;
}
public ConnectorPluginInfo(Class<? extends Connector> klass) {
this(klass.getCanonicalName(), ConnectorType.from(klass), getVersion(klass));
}
private static String getVersion(Class<? extends Connector> klass) {
if (!VERSIONS.containsKey(klass)) {
synchronized (VERSIONS) {
if (!VERSIONS.containsKey(klass)) {
try {
VERSIONS.put(klass, klass.newInstance().version());
} catch (
ExceptionInInitializerError
| InstantiationException
| IllegalAccessException
| SecurityException e
) {
log.warn("Unable to instantiate connector", e);
VERSIONS.put(klass, "unknown");
}
}
}
}
return VERSIONS.get(klass);
}
@JsonProperty("class")
public String clazz() {
return clazz;
public String className() {
return className;
}
@JsonProperty("type")
public ConnectorType type() {
return type;
}
@JsonProperty("version")
public String version() {
return version;
}
@Override
@ -44,16 +98,23 @@ public class ConnectorPluginInfo { @@ -44,16 +98,23 @@ public class ConnectorPluginInfo {
return false;
}
ConnectorPluginInfo that = (ConnectorPluginInfo) o;
return Objects.equals(clazz, that.clazz);
return Objects.equals(className, that.className) &&
type == that.type &&
Objects.equals(version, that.version);
}
@Override
public int hashCode() {
return Objects.hash(clazz);
return Objects.hash(className, type, version);
}
@Override
public String toString() {
return clazz;
final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
sb.append("className='").append(className).append('\'');
sb.append(", type=").append(type);
sb.append(", version='").append(version).append('\'');
sb.append('}');
return sb.toString();
}
}

52
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.Locale;
public enum ConnectorType {
SOURCE, SINK, UNKNOWN;
public static ConnectorType from(Class<? extends Connector> clazz) {
if (SinkConnector.class.isAssignableFrom(clazz)) {
return SINK;
}
if (SourceConnector.class.isAssignableFrom(clazz)) {
return SOURCE;
}
return UNKNOWN;
}
@Override
@JsonValue
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
@JsonCreator
public static ConnectorType forValue(String value) {
return ConnectorType.valueOf(value.toUpperCase(Locale.ROOT));
}
}

44
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java

@ -0,0 +1,44 @@ @@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest.entities;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ConnectorTypeTest {
@Test
public void testToStringIsLowerCase() {
for (ConnectorType ct : ConnectorType.values()) {
String shouldBeLower = ct.toString();
assertFalse(shouldBeLower.isEmpty());
for (Character c : shouldBeLower.toCharArray()) {
assertTrue(Character.isLowerCase(c));
}
}
}
@Test
public void testForValue() {
for (ConnectorType ct : ConnectorType.values()) {
assertEquals(ct, ConnectorType.forValue(ct.toString()));
}
}
}

126
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
@ -36,6 +37,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; @@ -36,6 +37,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.tools.MockConnector;
@ -55,6 +57,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -55,6 +57,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -358,18 +361,121 @@ public class ConnectorPluginsResourceTest { @@ -358,18 +361,121 @@ public class ConnectorPluginsResourceTest {
@Test
public void testListConnectorPlugins() {
Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins());
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class.getCanonicalName())));
assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class)));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class)));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class)));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class)));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class)));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class)));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class)));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class)));
assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class)));
assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class)));
}
@Test
public void testConnectorPluginsIncludesTypeAndVersionInformation()
throws IOException {
ConnectorPluginInfo sinkInfo = new ConnectorPluginInfo(TestSinkConnector.class);
ConnectorPluginInfo sourceInfo = new ConnectorPluginInfo(TestSourceConnector.class);
ConnectorPluginInfo unkownInfo =
new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class);
assertEquals(ConnectorType.SINK, sinkInfo.type());
assertEquals(ConnectorType.SOURCE, sourceInfo.type());
assertEquals(ConnectorType.UNKNOWN, unkownInfo.type());
assertEquals(TestSinkConnector.VERSION, sinkInfo.version());
assertEquals(TestSourceConnector.VERSION, sourceInfo.version());
final ObjectMapper objectMapper = new ObjectMapper();
String serializedSink = objectMapper.writeValueAsString(ConnectorType.SINK);
String serializedSource = objectMapper.writeValueAsString(ConnectorType.SOURCE);
String serializedUnknown = objectMapper.writeValueAsString(ConnectorType.UNKNOWN);
assertTrue(serializedSink.contains("sink"));
assertTrue(serializedSource.contains("source"));
assertTrue(serializedUnknown.contains("unknown"));
assertEquals(
ConnectorType.SINK,
objectMapper.readValue(serializedSink, ConnectorType.class)
);
assertEquals(
ConnectorType.SOURCE,
objectMapper.readValue(serializedSource, ConnectorType.class)
);
assertEquals(
ConnectorType.UNKNOWN,
objectMapper.readValue(serializedUnknown, ConnectorType.class)
);
}
public static class TestSinkConnector extends SinkConnector {
static final String VERSION = "some great version";
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
}
public static class TestSourceConnector extends SourceConnector {
static final String VERSION = "an entirely different version";
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
}
/* Name here needs to be unique as we are testing the aliasing mechanism */
public static class ConnectorPluginsResourceTestConnector extends Connector {

Loading…
Cancel
Save