Browse Source

KAFKA-7039: Create an instance of the plugin only it's a Versioned Plugin

Create an instance of the plugin only it's a Versioned Plugin. Prior to KIP-285, this was done for only for Connector and this PR will continue to have the same behavior.

Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>

Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #5191 from mageshn/KAFKA-7039
pull/5204/merge
Magesh Nandakumar 7 years ago committed by Ewen Cheslack-Postava
parent
commit
239dd0fb9b
  1. 10
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java

10
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java

@ -62,6 +62,7 @@ import java.util.stream.Collectors; @@ -62,6 +62,7 @@ import java.util.stream.Collectors;
public class DelegatingClassLoader extends URLClassLoader {
private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
private static final String CLASSPATH_NAME = "classpath";
private static final String UNDEFINED_VERSION = "undefined";
private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
private final Map<String, String> aliases;
@ -318,7 +319,7 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -318,7 +319,7 @@ public class DelegatingClassLoader extends URLClassLoader {
Collection<PluginDesc<T>> result = new ArrayList<>();
for (Class<? extends T> plugin : plugins) {
if (PluginUtils.isConcrete(plugin)) {
result.add(new PluginDesc<>(plugin, versionFor(plugin.newInstance()), loader));
result.add(new PluginDesc<>(plugin, versionFor(plugin), loader));
} else {
log.debug("Skipping {} as it is not concrete implementation", plugin);
}
@ -336,7 +337,12 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -336,7 +337,12 @@ public class DelegatingClassLoader extends URLClassLoader {
}
private static <T> String versionFor(T pluginImpl) {
return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : "undefined";
return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : UNDEFINED_VERSION;
}
private static <T> String versionFor(Class<? extends T> pluginKlass) throws IllegalAccessException, InstantiationException {
// Temporary workaround until all the plugins are versioned.
return Connector.class.isAssignableFrom(pluginKlass) ? versionFor(pluginKlass.newInstance()) : UNDEFINED_VERSION;
}
@Override

Loading…
Cancel
Save