diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index dd387c45f52..144dbd87f55 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -62,6 +62,7 @@ import java.util.stream.Collectors; public class DelegatingClassLoader extends URLClassLoader { private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); private static final String CLASSPATH_NAME = "classpath"; + private static final String UNDEFINED_VERSION = "undefined"; private final Map, ClassLoader>> pluginLoaders; private final Map aliases; @@ -318,7 +319,7 @@ public class DelegatingClassLoader extends URLClassLoader { Collection> result = new ArrayList<>(); for (Class plugin : plugins) { if (PluginUtils.isConcrete(plugin)) { - result.add(new PluginDesc<>(plugin, versionFor(plugin.newInstance()), loader)); + result.add(new PluginDesc<>(plugin, versionFor(plugin), loader)); } else { log.debug("Skipping {} as it is not concrete implementation", plugin); } @@ -336,7 +337,12 @@ public class DelegatingClassLoader extends URLClassLoader { } private static String versionFor(T pluginImpl) { - return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : "undefined"; + return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : UNDEFINED_VERSION; + } + + private static String versionFor(Class pluginKlass) throws IllegalAccessException, InstantiationException { + // Temporary workaround until all the plugins are versioned. + return Connector.class.isAssignableFrom(pluginKlass) ? versionFor(pluginKlass.newInstance()) : UNDEFINED_VERSION; } @Override