Browse Source

KAFKA-3606: Traverse CLASSPATH during herder start

ewencp Can you take a quick look?

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1252 from Ishiihara/pre-list-connectors
pull/1252/merge
Liquan Pei 9 years ago committed by Ewen Cheslack-Postava
parent
commit
c7f9bd2a68
  1. 49
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

49
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

@ -84,6 +84,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @@ -84,6 +84,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class);
private static List<ConnectorPluginInfo> validConnectorPlugins;
private static final Object LOCK = new Object();
private Thread classPathTraverser;
public AbstractHerder(Worker worker,
String workerId,
@ -101,12 +104,20 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @@ -101,12 +104,20 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
this.worker.start();
this.statusBackingStore.start();
this.configBackingStore.start();
traverseClassPath();
}
protected void stopServices() {
this.statusBackingStore.stop();
this.configBackingStore.stop();
this.worker.stop();
if (this.classPathTraverser != null) {
try {
this.classPathTraverser.join();
} catch (InterruptedException e) {
// ignore as it can only happen during shutdown
}
}
}
@Override
@ -248,22 +259,24 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @@ -248,22 +259,24 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
}
public static List<ConnectorPluginInfo> connectorPlugins() {
if (validConnectorPlugins != null) {
return validConnectorPlugins;
}
synchronized (LOCK) {
if (validConnectorPlugins != null) {
return validConnectorPlugins;
}
Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
connectorClasses.removeAll(SKIPPED_CONNECTORS);
List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>();
for (Class<? extends Connector> connectorClass: connectorClasses) {
int mod = connectorClass.getModifiers();
if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) {
connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
connectorClasses.removeAll(SKIPPED_CONNECTORS);
List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>();
for (Class<? extends Connector> connectorClass : connectorClasses) {
int mod = connectorClass.getModifiers();
if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) {
connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
}
}
validConnectorPlugins = connectorPlugins;
return connectorPlugins;
}
validConnectorPlugins = connectorPlugins;
return connectorPlugins;
}
// public for testing
@ -354,4 +367,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @@ -354,4 +367,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
return null;
}
}
private void traverseClassPath() {
classPathTraverser = new Thread(new Runnable() {
@Override
public void run() {
connectorPlugins();
}
}, "CLASSPATH traversal thread.");
classPathTraverser.start();
}
}

Loading…
Cancel
Save