From 239dd0fb9b8a72ff0f108fea9a9372ab0858339c Mon Sep 17 00:00:00 2001 From: Magesh Nandakumar Date: Sat, 16 Jun 2018 22:18:20 -0700 Subject: [PATCH] KAFKA-7039: Create an instance of the plugin only it's a Versioned Plugin Create an instance of the plugin only it's a Versioned Plugin. Prior to KIP-285, this was done for only for Connector and this PR will continue to have the same behavior. Author: Magesh Nandakumar Reviewers: Randall Hauch , Konstantine Karantasis , Ewen Cheslack-Postava Closes #5191 from mageshn/KAFKA-7039 --- .../runtime/isolation/DelegatingClassLoader.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index dd387c45f52..144dbd87f55 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -62,6 +62,7 @@ import java.util.stream.Collectors; public class DelegatingClassLoader extends URLClassLoader { private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); private static final String CLASSPATH_NAME = "classpath"; + private static final String UNDEFINED_VERSION = "undefined"; private final Map, ClassLoader>> pluginLoaders; private final Map aliases; @@ -318,7 +319,7 @@ public class DelegatingClassLoader extends URLClassLoader { Collection> result = new ArrayList<>(); for (Class plugin : plugins) { if (PluginUtils.isConcrete(plugin)) { - result.add(new PluginDesc<>(plugin, versionFor(plugin.newInstance()), loader)); + result.add(new PluginDesc<>(plugin, versionFor(plugin), loader)); } else { log.debug("Skipping {} as it is not concrete implementation", plugin); } @@ -336,7 +337,12 @@ public class DelegatingClassLoader extends URLClassLoader { } private static String versionFor(T pluginImpl) { - return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : "undefined"; + return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : UNDEFINED_VERSION; + } + + private static String versionFor(Class pluginKlass) throws IllegalAccessException, InstantiationException { + // Temporary workaround until all the plugins are versioned. + return Connector.class.isAssignableFrom(pluginKlass) ? versionFor(pluginKlass.newInstance()) : UNDEFINED_VERSION; } @Override