Browse Source

MINOR: Traverse plugin path recursively in Connect (KIP-146)

Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3173 from kkonstantine/MINOR-Traverse-plugin-path-recursively-in-Connect
pull/3160/merge
Konstantine Karantasis 8 years ago committed by Ewen Cheslack-Postava
parent
commit
e0150a25e8
  1. 99
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
  2. 76
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
  3. 17
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
  4. 188
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  5. 10
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java

99
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java

@ -1,99 +0,0 @@ @@ -1,99 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.reflections.Reflections;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
public class ConnectorFactory {
public Connector newConnector(String connectorClassOrAlias) {
return instantiate(getConnectorClass(connectorClassOrAlias));
}
public Task newTask(Class<? extends Task> taskClass) {
return instantiate(taskClass);
}
private static <T> T instantiate(Class<? extends T> cls) {
try {
return Utils.newInstance(cls);
} catch (Throwable t) {
throw new ConnectException("Instantiation error", t);
}
}
@SuppressWarnings("unchecked")
private static Class<? extends Connector> getConnectorClass(String connectorClassOrAlias) {
// Avoid the classpath scan if the full class name was provided
try {
Class<?> clazz = Class.forName(connectorClassOrAlias);
if (!Connector.class.isAssignableFrom(clazz))
throw new ConnectException("Class " + connectorClassOrAlias + " does not implement Connector");
return (Class<? extends Connector>) clazz;
} catch (ClassNotFoundException e) {
// Fall through to scan for the alias
}
// Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration
Reflections reflections = new Reflections(new ConfigurationBuilder()
.setUrls(ClasspathHelper.forJavaClassPath()));
Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
List<Class<? extends Connector>> results = new ArrayList<>();
for (Class<? extends Connector> connector: connectors) {
// Configuration included the class name but not package
if (connector.getSimpleName().equals(connectorClassOrAlias))
results.add(connector);
// Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector)
if (connector.getSimpleName().equals(connectorClassOrAlias + "Connector"))
results.add(connector);
}
if (results.isEmpty())
throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorClassOrAlias +
", available connectors are: " + connectorNames(connectors));
if (results.size() > 1) {
throw new ConnectException("More than one connector matches alias " + connectorClassOrAlias +
". Please use full package and class name instead. Classes found: " + connectorNames(results));
}
// We just validated that we have exactly one result, so this is safe
return results.get(0);
}
private static String connectorNames(Collection<Class<? extends Connector>> connectors) {
StringBuilder names = new StringBuilder();
for (Class<?> c : connectors)
names.append(c.getName()).append(", ");
return names.substring(0, names.toString().length() - 2);
}
}

76
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java

@ -51,6 +51,7 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -51,6 +51,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
private final Map<String, String> aliases;
private final SortedSet<PluginDesc<Connector>> connectors;
private final SortedSet<PluginDesc<Converter>> converters;
private final SortedSet<PluginDesc<Transformation>> transformations;
@ -61,6 +62,7 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -61,6 +62,7 @@ public class DelegatingClassLoader extends URLClassLoader {
super(new URL[0], parent);
this.pluginPaths = pluginPaths;
this.pluginLoaders = new HashMap<>();
this.aliases = new HashMap<>();
this.activePaths = new HashMap<>();
this.connectors = new TreeSet<>();
this.converters = new TreeSet<>();
@ -89,8 +91,10 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -89,8 +91,10 @@ public class DelegatingClassLoader extends URLClassLoader {
public ClassLoader connectorLoader(String connectorClassOrAlias) {
log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias);
SortedMap<PluginDesc<?>, ClassLoader> inner =
pluginLoaders.get(connectorClassOrAlias);
String fullName = aliases.containsKey(connectorClassOrAlias)
? aliases.get(connectorClassOrAlias)
: connectorClassOrAlias;
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
log.error(
"Plugin class loader for connector: '{}' was not found. Returning: {}",
@ -137,23 +141,16 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -137,23 +141,16 @@ public class DelegatingClassLoader extends URLClassLoader {
for (String configPath : pluginPaths) {
path = configPath;
Path pluginPath = Paths.get(path).toAbsolutePath();
// Update for exception handling
path = pluginPath.toString();
// Currently 'plugin.paths' property is a list of top-level directories
// containing plugins
if (Files.isDirectory(pluginPath)) {
for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
log.info("Loading plugin from: {}", pluginLocation);
URL[] urls = PluginUtils.pluginUrls(pluginLocation).toArray(new URL[0]);
if (log.isDebugEnabled()) {
log.debug("Loading plugin urls: {}", Arrays.toString(urls));
}
PluginClassLoader loader = newPluginClassLoader(
pluginLocation.toUri().toURL(),
urls,
this
);
scanUrlsAndAddPlugins(loader, urls, pluginLocation);
registerPlugin(pluginLocation);
}
} else if (PluginUtils.isArchive(pluginPath)) {
registerPlugin(pluginPath);
}
}
@ -165,15 +162,34 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -165,15 +162,34 @@ public class DelegatingClassLoader extends URLClassLoader {
null
);
} catch (InvalidPathException | MalformedURLException e) {
log.error("Invalid path in plugin path: {}. Ignoring.", path);
log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
} catch (IOException e) {
log.error("Could not get listing for plugin path: {}. Ignoring.", path);
log.error("Could not get listing for plugin path: {}. Ignoring.", path, e);
} catch (InstantiationException | IllegalAccessException e) {
log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e);
}
addAllAliases();
}
private void registerPlugin(Path pluginLocation)
throws InstantiationException, IllegalAccessException, IOException {
log.info("Loading plugin from: {}", pluginLocation);
List<URL> pluginUrls = new ArrayList<>();
for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
pluginUrls.add(path.toUri().toURL());
}
URL[] urls = pluginUrls.toArray(new URL[0]);
if (log.isDebugEnabled()) {
log.debug("Loading plugin urls: {}", Arrays.toString(urls));
}
PluginClassLoader loader = newPluginClassLoader(
pluginLocation.toUri().toURL(),
urls,
this
);
scanUrlsAndAddPlugins(loader, urls, pluginLocation);
}
private void scanUrlsAndAddPlugins(
ClassLoader loader,
URL[] urls,
@ -245,28 +261,17 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -245,28 +261,17 @@ public class DelegatingClassLoader extends URLClassLoader {
return super.loadClass(name, resolve);
}
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
String fullName = aliases.containsKey(name) ? aliases.get(name) : name;
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner != null) {
log.trace("Retrieving loaded class '{}' from '{}'", name, inner.get(inner.lastKey()));
ClassLoader pluginLoader = inner.get(inner.lastKey());
log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
return pluginLoader instanceof PluginClassLoader
? ((PluginClassLoader) pluginLoader).loadClass(name, resolve)
: super.loadClass(name, resolve);
? ((PluginClassLoader) pluginLoader).loadClass(fullName, resolve)
: super.loadClass(fullName, resolve);
}
Class<?> klass = null;
for (PluginClassLoader loader : activePaths.values()) {
try {
klass = loader.loadClass(name, resolve);
break;
} catch (ClassNotFoundException e) {
// Not found in this loader.
}
}
if (klass == null) {
return super.loadClass(name, resolve);
}
return klass;
return super.loadClass(fullName, resolve);
}
private void addAllAliases() {
@ -280,12 +285,11 @@ public class DelegatingClassLoader extends URLClassLoader { @@ -280,12 +285,11 @@ public class DelegatingClassLoader extends URLClassLoader {
if (PluginUtils.isAliasUnique(plugin, plugins)) {
String simple = PluginUtils.simpleName(plugin);
String pruned = PluginUtils.prunedName(plugin);
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(plugin.className());
pluginLoaders.put(simple, inner);
aliases.put(simple, plugin.className());
if (simple.equals(pruned)) {
log.info("Added alias '{}' to plugin '{}'", simple, plugin.className());
} else {
pluginLoaders.put(pruned, inner);
aliases.put(pruned, plugin.className());
log.info(
"Added aliases '{}' and '{}' to plugin '{}'",
simple,

17
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java

@ -23,7 +23,7 @@ import java.net.URL; @@ -23,7 +23,7 @@ import java.net.URL;
import java.net.URLClassLoader;
public class PluginClassLoader extends URLClassLoader {
private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
private static final Logger log = LoggerFactory.getLogger(PluginClassLoader.class);
private final URL pluginLocation;
public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
@ -49,16 +49,17 @@ public class PluginClassLoader extends URLClassLoader { @@ -49,16 +49,17 @@ public class PluginClassLoader extends URLClassLoader {
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
Class<?> klass = findLoadedClass(name);
if (klass == null) {
if (PluginUtils.shouldLoadInIsolation(name)) {
try {
try {
if (PluginUtils.shouldLoadInIsolation(name)) {
klass = findClass(name);
} catch (ClassNotFoundException e) {
// Not found in loader's path. Search in parents.
}
} catch (ClassNotFoundException e) {
// Not found in loader's path. Search in parents.
log.trace("Class '{}' not found. Delegating to parent", name);
}
if (klass == null) {
klass = super.loadClass(name, false);
}
}
if (klass == null) {
klass = super.loadClass(name, false);
}
if (resolve) {
resolveClass(klass);

188
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java

@ -16,26 +16,107 @@ @@ -16,26 +16,107 @@
*/
package org.apache.kafka.connect.runtime.isolation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
public class PluginUtils {
private static final Logger log = LoggerFactory.getLogger(PluginUtils.class);
// Be specific about javax packages and exclude those existing in Java SE and Java EE libraries.
private static final String BLACKLIST = "^(?:"
+ "java"
+ "|javax"
+ "|org\\.omg"
+ "|javax\\.accessibility"
+ "|javax\\.activation"
+ "|javax\\.activity"
+ "|javax\\.annotation"
+ "|javax\\.batch\\.api"
+ "|javax\\.batch\\.operations"
+ "|javax\\.batch\\.runtime"
+ "|javax\\.crypto"
+ "|javax\\.decorator"
+ "|javax\\.ejb"
+ "|javax\\.el"
+ "|javax\\.enterprise\\.concurrent"
+ "|javax\\.enterprise\\.context"
+ "|javax\\.enterprise\\.context\\.spi"
+ "|javax\\.enterprise\\.deploy\\.model"
+ "|javax\\.enterprise\\.deploy\\.shared"
+ "|javax\\.enterprise\\.deploy\\.spi"
+ "|javax\\.enterprise\\.event"
+ "|javax\\.enterprise\\.inject"
+ "|javax\\.enterprise\\.inject\\.spi"
+ "|javax\\.enterprise\\.util"
+ "|javax\\.faces"
+ "|javax\\.imageio"
+ "|javax\\.inject"
+ "|javax\\.interceptor"
+ "|javax\\.jms"
+ "|javax\\.json"
+ "|javax\\.jws"
+ "|javax\\.lang\\.model"
+ "|javax\\.mail"
+ "|javax\\.management"
+ "|javax\\.management\\.j2ee"
+ "|javax\\.naming"
+ "|javax\\.net"
+ "|javax\\.persistence"
+ "|javax\\.print"
+ "|javax\\.resource"
+ "|javax\\.rmi"
+ "|javax\\.script"
+ "|javax\\.security\\.auth"
+ "|javax\\.security\\.auth\\.message"
+ "|javax\\.security\\.cert"
+ "|javax\\.security\\.jacc"
+ "|javax\\.security\\.sasl"
+ "|javax\\.servlet"
+ "|javax\\.sound\\.midi"
+ "|javax\\.sound\\.sampled"
+ "|javax\\.sql"
+ "|javax\\.swing"
+ "|javax\\.tools"
+ "|javax\\.transaction"
+ "|javax\\.validation"
+ "|javax\\.websocket"
+ "|javax\\.ws\\.rs"
+ "|javax\\.xml"
+ "|javax\\.xml\\.bind"
+ "|javax\\.xml\\.registry"
+ "|javax\\.xml\\.rpc"
+ "|javax\\.xml\\.soap"
+ "|javax\\.xml\\.ws"
+ "|org\\.ietf\\.jgss"
+ "|org\\.omg\\.CORBA"
+ "|org\\.omg\\.CosNaming"
+ "|org\\.omg\\.Dynamic"
+ "|org\\.omg\\.DynamicAny"
+ "|org\\.omg\\.IOP"
+ "|org\\.omg\\.Messaging"
+ "|org\\.omg\\.PortableInterceptor"
+ "|org\\.omg\\.PortableServer"
+ "|org\\.omg\\.SendingContext"
+ "|org\\.omg\\.stub\\.java\\.rmi"
+ "|org\\.w3c\\.dom"
+ "|org\\.xml\\.sax"
+ "|org\\.apache\\.kafka\\.common"
+ "|org\\.apache\\.kafka\\.connect"
+ "|org\\.apache\\.log4j"
+ "|org\\.slf4j"
+ ")\\..*$";
private static final String WHITELIST = "^org\\.apache\\.kafka\\.connect\\.(?:"
@ -50,7 +131,7 @@ public class PluginUtils { @@ -50,7 +131,7 @@ public class PluginUtils {
.Filter<Path>() {
@Override
public boolean accept(Path path) throws IOException {
return Files.isDirectory(path) || PluginUtils.isJar(path);
return Files.isDirectory(path) || isArchive(path) || isClassFile(path);
}
};
@ -63,32 +144,16 @@ public class PluginUtils { @@ -63,32 +144,16 @@ public class PluginUtils {
return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
}
public static boolean isJar(Path path) {
public static boolean isArchive(Path path) {
return path.toString().toLowerCase(Locale.ROOT).endsWith(".jar");
}
public static List<URL> pluginUrls(Path pluginPath) throws IOException {
List<URL> urls = new ArrayList<>();
if (PluginUtils.isJar(pluginPath)) {
urls.add(pluginPath.toUri().toURL());
} else if (Files.isDirectory(pluginPath)) {
try (
DirectoryStream<Path> listing = Files.newDirectoryStream(
pluginPath,
PLUGIN_PATH_FILTER
)
) {
for (Path jar : listing) {
urls.add(jar.toUri().toURL());
}
}
}
return urls;
public static boolean isClassFile(Path path) {
return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
}
public static List<Path> pluginLocations(Path topPath) throws IOException {
List<Path> locations = new ArrayList<>();
// Non-recursive for now. Plugin directories or jars need to be exactly under the topPath.
try (
DirectoryStream<Path> listing = Files.newDirectoryStream(
topPath,
@ -102,6 +167,71 @@ public class PluginUtils { @@ -102,6 +167,71 @@ public class PluginUtils {
return locations;
}
public static List<Path> pluginUrls(Path topPath) throws IOException {
boolean containsClassFiles = false;
Set<Path> archives = new HashSet<>();
LinkedList<DirectoryEntry> dfs = new LinkedList<>();
Set<Path> visited = new HashSet<>();
if (isArchive(topPath)) {
return Collections.singletonList(topPath);
}
DirectoryStream<Path> topListing = Files.newDirectoryStream(
topPath,
PLUGIN_PATH_FILTER
);
dfs.push(new DirectoryEntry(topListing));
visited.add(topPath);
try {
while (!dfs.isEmpty()) {
Iterator<Path> neighbors = dfs.peek().iterator;
if (!neighbors.hasNext()) {
dfs.pop().stream.close();
continue;
}
Path adjacent = neighbors.next();
if (Files.isSymbolicLink(adjacent)) {
Path absolute = Files.readSymbolicLink(adjacent).toRealPath();
if (Files.exists(absolute)) {
adjacent = absolute;
} else {
continue;
}
}
if (!visited.contains(adjacent)) {
visited.add(adjacent);
if (isArchive(adjacent)) {
archives.add(adjacent);
} else if (isClassFile(adjacent)) {
containsClassFiles = true;
} else {
DirectoryStream<Path> listing = Files.newDirectoryStream(
adjacent,
PLUGIN_PATH_FILTER
);
dfs.push(new DirectoryEntry(listing));
}
}
}
} finally {
while (!dfs.isEmpty()) {
dfs.pop().stream.close();
}
}
if (containsClassFiles) {
if (archives.isEmpty()) {
return Collections.singletonList(topPath);
}
log.warn("Plugin path contains both java archives and class files. Returning only the"
+ " archives");
}
return Arrays.asList(archives.toArray(new Path[0]));
}
public static String simpleName(PluginDesc<?> plugin) {
return plugin.pluginClass().getSimpleName();
}
@ -144,4 +274,14 @@ public class PluginUtils { @@ -144,4 +274,14 @@ public class PluginUtils {
return simple;
}
private static class DirectoryEntry {
DirectoryStream<Path> stream;
Iterator<Path> iterator;
DirectoryEntry(DirectoryStream<Path> stream) {
this.stream = stream;
this.iterator = stream.iterator();
}
}
}

10
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java

@ -39,20 +39,22 @@ public class PluginUtilsTest { @@ -39,20 +39,22 @@ public class PluginUtilsTest {
assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.String"));
assertFalse(PluginUtils.shouldLoadInIsolation("java.util.HashMap$Entry"));
assertFalse(PluginUtils.shouldLoadInIsolation("java.io.Serializable"));
assertFalse(PluginUtils.shouldLoadInIsolation("javax."));
assertFalse(PluginUtils.shouldLoadInIsolation("javax.rmi."));
assertFalse(PluginUtils.shouldLoadInIsolation(
"javax.management.loading.ClassLoaderRepository")
);
assertFalse(PluginUtils.shouldLoadInIsolation("org.omg."));
assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA."));
assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA.Object"));
assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom."));
assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom.traversal.TreeWalker"));
assertFalse(PluginUtils.shouldLoadInIsolation("org.xml.sax."));
assertFalse(PluginUtils.shouldLoadInIsolation("org.xml.sax.EntityResolver"));
}
@Test
public void testThirdPartyClasses() throws Exception {
assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j."));
assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j.Level"));
assertFalse(PluginUtils.shouldLoadInIsolation("org.slf4j."));
assertFalse(PluginUtils.shouldLoadInIsolation("org.slf4j.LoggerFactory"));
}
@Test

Loading…
Cancel
Save