Browse Source

KAFKA-15150: Add ServiceLoaderScanner implementation (#13971)

Reviewers: Chris Egerton <chris.egerton@aiven.io>
pull/14061/head
Greg Harris 1 year ago committed by GitHub
parent
commit
f6e7aa3763
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 70
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
  2. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
  3. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  4. 54
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
  5. 80
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java
  6. 97
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
  7. 2
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
  8. 16
      connect/runtime/src/test/resources/test-plugins/aliased-static-field/META-INF/services/org.apache.kafka.connect.storage.Converter
  9. 16
      connect/runtime/src/test/resources/test-plugins/always-throw-exception/META-INF/services/org.apache.kafka.connect.storage.Converter
  10. 21
      connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
  11. 19
      connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter
  12. 17
      connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/META-INF/services/org.apache.kafka.connect.storage.Converter
  13. 16
      connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/META-INF/services/org.apache.kafka.connect.storage.Converter
  14. 16
      connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/META-INF/services/org.apache.kafka.connect.storage.Converter
  15. 16
      connect/runtime/src/test/resources/test-plugins/sampling-configurable/META-INF/services/org.apache.kafka.connect.storage.Converter
  16. 16
      connect/runtime/src/test/resources/test-plugins/sampling-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
  17. 16
      connect/runtime/src/test/resources/test-plugins/sampling-converter/META-INF/services/org.apache.kafka.connect.storage.Converter
  18. 16
      connect/runtime/src/test/resources/test-plugins/sampling-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
  19. 16
      connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/org.apache.kafka.connect.storage.Converter
  20. 16
      connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/META-INF/services/org.apache.kafka.connect.storage.Converter

70
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java

@ -28,11 +28,13 @@ import java.util.ArrayList; @@ -28,11 +28,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Supplier;
/**
* Superclass for plugin discovery implementations.
@ -118,35 +120,79 @@ public abstract class PluginScanner { @@ -118,35 +120,79 @@ public abstract class PluginScanner {
}
@SuppressWarnings({"rawtypes", "unchecked"})
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, ClassLoader loader) {
return new PluginDesc(plugin, version, loader);
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginSource source) {
return new PluginDesc(plugin, version, source.loader());
}
@SuppressWarnings("unchecked")
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, PluginSource source) {
SortedSet<PluginDesc<T>> result = new TreeSet<>();
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, source.loader());
Iterator<T> iterator = serviceLoader.iterator();
while (handleLinkageError(klass, source, iterator::hasNext)) {
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
T pluginImpl;
try {
pluginImpl = iterator.next();
pluginImpl = handleLinkageError(klass, source, iterator::next);
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
log.error("Failed to discover {} in {}{}",
klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
continue;
}
Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass();
if (pluginKlass.getClassLoader() != loader) {
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader);
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location());
continue;
}
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader));
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source));
}
}
return result;
}
/**
* Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s.
*
* @param klass The plugin superclass which is being loaded
* @param function A function on a {@link ServiceLoader}'s {@link Iterator} which may throw {@link LinkageError}
* @return the return value of function
* @throws Error errors thrown by the passed-in function
* @param <T> Type being iterated over by the ServiceLoader
* @param <U> Return value of the passed-in function
*/
private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, Supplier<U> function) {
// It's difficult to know for sure if the iterator was able to advance past the first broken
// plugin class, or if it will continue to fail on that broken class for any subsequent calls
// to Iterator::hasNext or Iterator::next
// For reference, see https://bugs.openjdk.org/browse/JDK-8196182, which describes
// the behavior we are trying to mitigate with this logic as buggy, but indicates that a fix
// in the JDK standard library ServiceLoader implementation is unlikely to land
LinkageError lastError = null;
// Try a fixed maximum number of times in case the ServiceLoader cannot move past a faulty plugin,
// but the LinkageError varies between calls. This limit is chosen to be higher than the typical number
// of plugins in a single plugin location, and to limit the amount of log-spam on startup.
for (int i = 0; i < 100; i++) {
try {
return function.get();
} catch (LinkageError t) {
// As an optimization, hide subsequent error logs if two consecutive errors look similar.
// This reduces log-spam for iterators which cannot advance and rethrow the same exception.
if (lastError == null
|| !Objects.equals(lastError.getClass(), t.getClass())
|| !Objects.equals(lastError.getMessage(), t.getMessage())) {
log.error("Failed to discover {} in {}{}",
klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
}
lastError = t;
}
}
log.error("Received excessive ServiceLoader errors: assuming the runtime ServiceLoader implementation cannot " +
"skip faulty implementations. Use a different JRE, or resolve LinkageErrors for plugins in {}",
source.location(), lastError);
throw lastError;
}
protected static <T> String versionFor(T pluginImpl) {
try {
if (pluginImpl instanceof Versioned) {
@ -169,6 +215,8 @@ public abstract class PluginScanner { @@ -169,6 +215,8 @@ public abstract class PluginScanner {
return ": Failed to statically initialize plugin class";
} else if (t instanceof InvocationTargetException) {
return ": Failed to invoke plugin constructor";
} else if (t instanceof LinkageError) {
return ": Plugin class has a dependency which is missing or invalid";
} else {
return "";
}

4
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java

@ -18,11 +18,13 @@ package org.apache.kafka.connect.runtime.isolation; @@ -18,11 +18,13 @@ package org.apache.kafka.connect.runtime.isolation;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Objects;
public class PluginSource {
public static final Path CLASSPATH = Paths.get("classpath");
private final Path location;
private final ClassLoader loader;
private final URL[] urls;
@ -46,7 +48,7 @@ public class PluginSource { @@ -46,7 +48,7 @@ public class PluginSource {
}
public boolean isolated() {
return loader instanceof PluginClassLoader;
return location != CLASSPATH;
}
@Override

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java

@ -353,7 +353,7 @@ public class PluginUtils { @@ -353,7 +353,7 @@ public class PluginUtils {
List<URL> parentUrls = new ArrayList<>();
parentUrls.addAll(ClasspathHelper.forJavaClassPath());
parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent()));
pluginSources.add(new PluginSource(null, classLoader.getParent(), parentUrls.toArray(new URL[0])));
pluginSources.add(new PluginSource(PluginSource.CLASSPATH, classLoader.getParent(), parentUrls.toArray(new URL[0])));
return pluginSources;
}

54
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java

@ -43,6 +43,7 @@ import java.util.TreeSet; @@ -43,6 +43,7 @@ import java.util.TreeSet;
* <p>This implements the legacy discovery strategy, which uses a combination of reflection and service loading in
* order to discover plugins. Specifically, a plugin appears in the scan result if all the following conditions are true:
* <ul>
* <li>The class and direct dependencies can be loaded</li>
* <li>The class is concrete</li>
* <li>The class is public</li>
* <li>The class has a no-args constructor</li>
@ -60,7 +61,9 @@ import java.util.TreeSet; @@ -60,7 +61,9 @@ import java.util.TreeSet;
* </li>
* </ul>
* <p>Note: This scanner has a runtime proportional to the number of overall classes in the passed-in
* {@link PluginSource} objects, which may be significant for plugins with large dependencies.
* {@link PluginSource} objects, which may be significant for plugins with large dependencies. For a more performant
* implementation, consider using {@link ServiceLoaderScanner} and follow migration instructions for
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery">KIP-898</a>.
*/
public class ReflectionScanner extends PluginScanner {
@ -73,66 +76,67 @@ public class ReflectionScanner extends PluginScanner { @@ -73,66 +76,67 @@ public class ReflectionScanner extends PluginScanner {
@Override
protected PluginScanResult scanPlugins(PluginSource source) {
ClassLoader loader = source.loader();
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.setClassLoaders(new ClassLoader[]{source.loader()});
builder.addUrls(source.urls());
builder.setScanners(Scanners.SubTypes);
builder.setParallel(true);
Reflections reflections = new Reflections(builder);
return new PluginScanResult(
getPluginDesc(reflections, SinkConnector.class, loader),
getPluginDesc(reflections, SourceConnector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getTransformationPluginDesc(loader, reflections),
getPredicatePluginDesc(loader, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
getPluginDesc(reflections, SinkConnector.class, source),
getPluginDesc(reflections, SourceConnector.class, source),
getPluginDesc(reflections, Converter.class, source),
getPluginDesc(reflections, HeaderConverter.class, source),
getTransformationPluginDesc(source, reflections),
getPredicatePluginDesc(source, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, source),
getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, loader);
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, source);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, loader);
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, source);
}
private <T> SortedSet<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
ClassLoader loader
PluginSource source
) {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any classes for URLs: " +
reflections.getConfiguration().getUrls(), e);
log.debug("Reflections scanner could not find any {} in {} for URLs: {}",
klass, source.location(), source.urls(), e);
return Collections.emptySortedSet();
}
SortedSet<PluginDesc<T>> result = new TreeSet<>();
for (Class<? extends T> pluginKlass : plugins) {
if (!PluginUtils.isConcrete(pluginKlass)) {
log.debug("Skipping {} as it is not concrete implementation", pluginKlass);
log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source.location());
continue;
}
if (pluginKlass.getClassLoader() != loader) {
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader);
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location());
continue;
}
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), loader));
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), source));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e);
log.error("Failed to discover {} in {}: Unable to instantiate {}{}",
klass.getSimpleName(), source.location(), pluginKlass.getSimpleName(),
reflectiveErrorDescription(e), e);
}
}
return result;

80
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java

@ -0,0 +1,80 @@ @@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.ServiceLoader;
import java.util.SortedSet;
/**
* A {@link PluginScanner} implementation which uses {@link ServiceLoader} to discover plugins.
* <p>This implements the modern discovery strategy, which uses only service loading in order to discover plugins.
* Specifically, a plugin appears in the scan result if all the following conditions are true:
* <ul>
* <li>The class and direct dependencies can be loaded</li>
* <li>The class is concrete</li>
* <li>The class is public</li>
* <li>The class has a no-args constructor</li>
* <li>The no-args constructor is public</li>
* <li>Static initialization of the class completes without throwing an exception</li>
* <li>The no-args constructor completes without throwing an exception</li>
* <li>The class is a subclass of {@link SinkConnector}, {@link SourceConnector}, {@link Converter},
* {@link HeaderConverter}, {@link Transformation}, {@link Predicate}, {@link ConfigProvider},
* {@link ConnectRestExtension}, or {@link ConnectorClientConfigOverridePolicy}
* </li>
* <li>The class has a {@link ServiceLoader} compatible manifest file or module declaration</li>
* </ul>
* <p>Note: This scanner can only find plugins with corresponding {@link ServiceLoader} manifests, which are
* not required to be packaged with plugins. This has the effect that some plugins discoverable by the
* {@link ReflectionScanner} may not be visible with this implementation.
*/
public class ServiceLoaderScanner extends PluginScanner {
@Override
protected PluginScanResult scanPlugins(PluginSource source) {
return new PluginScanResult(
getServiceLoaderPluginDesc(SinkConnector.class, source),
getServiceLoaderPluginDesc(SourceConnector.class, source),
getServiceLoaderPluginDesc(Converter.class, source),
getServiceLoaderPluginDesc(HeaderConverter.class, source),
getTransformationPluginDesc(source),
getPredicatePluginDesc(source),
getServiceLoaderPluginDesc(ConfigProvider.class, source),
getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Predicate.class, source);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Transformation.class, source);
}
}

97
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java

@ -20,79 +20,113 @@ package org.apache.kafka.connect.runtime.isolation; @@ -20,79 +20,113 @@ package org.apache.kafka.connect.runtime.isolation;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class PluginScannerTest {
private enum ScannerType { Reflection, ServiceLoader };
@Rule
public TemporaryFolder pluginDir = new TemporaryFolder();
private final PluginScanner scanner;
@Parameterized.Parameters
public static Collection<Object[]> parameters() {
List<Object[]> values = new ArrayList<>();
for (ScannerType type : ScannerType.values()) {
values.add(new Object[]{type});
}
return values;
}
public PluginScannerTest(ScannerType scannerType) {
switch (scannerType) {
case Reflection:
this.scanner = new ReflectionScanner();
break;
case ServiceLoader:
this.scanner = new ServiceLoaderScanner();
break;
default:
throw new IllegalArgumentException("Unknown type " + scannerType);
}
}
@Test
public void testLoadingUnloadedPluginClass() {
DelegatingClassLoader classLoader = initClassLoader(
public void testScanningEmptyPluginPath() {
PluginScanResult result = scan(
Collections.emptyList()
);
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName));
}
assertTrue(result.isEmpty());
}
@Test
public void testLoadingPluginClass() throws ClassNotFoundException {
DelegatingClassLoader classLoader = initClassLoader(
public void testScanningPluginClasses() {
PluginScanResult result = scan(
TestPlugins.pluginPath()
);
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
}
Set<String> classes = new HashSet<>();
result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
Set<String> expectedClasses = new HashSet<>(TestPlugins.pluginClasses());
assertEquals(expectedClasses, classes);
}
@Test
public void testLoadingInvalidUberJar() throws Exception {
public void testScanningInvalidUberJar() throws Exception {
pluginDir.newFile("invalid.jar");
initClassLoader(
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}
@Test
public void testLoadingPluginDirContainsInvalidJarsOnly() throws Exception {
public void testScanningPluginDirContainsInvalidJarsOnly() throws Exception {
pluginDir.newFolder("my-plugin");
pluginDir.newFile("my-plugin/invalid.jar");
initClassLoader(
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}
@Test
public void testLoadingNoPlugins() {
initClassLoader(
public void testScanningNoPlugins() {
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}
@Test
public void testLoadingPluginDirEmpty() throws Exception {
public void testScanningPluginDirEmpty() throws Exception {
pluginDir.newFolder("my-plugin");
initClassLoader(
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}
@Test
public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
public void testScanningMixOfValidAndInvalidPlugins() throws Exception {
pluginDir.newFile("invalid.jar");
pluginDir.newFolder("my-plugin");
pluginDir.newFile("my-plugin/invalid.jar");
@ -102,22 +136,19 @@ public class PluginScannerTest { @@ -102,22 +136,19 @@ public class PluginScannerTest {
Files.copy(source, pluginPath.resolve(source.getFileName()));
}
DelegatingClassLoader classLoader = initClassLoader(
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
}
Set<String> classes = new HashSet<>();
result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
Set<String> expectedClasses = new HashSet<>(TestPlugins.pluginClasses());
assertEquals(expectedClasses, classes);
}
private DelegatingClassLoader initClassLoader(List<Path> pluginLocations) {
private PluginScanResult scan(List<Path> pluginLocations) {
ClassLoaderFactory factory = new ClassLoaderFactory();
DelegatingClassLoader classLoader = factory.newDelegatingClassLoader(DelegatingClassLoader.class.getClassLoader());
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, classLoader, factory);
PluginScanResult scanResult = new ReflectionScanner().discoverPlugins(pluginSources);
classLoader.installDiscoveredPlugins(scanResult);
return classLoader;
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory);
return scanner.discoverPlugins(pluginSources);
}
}

2
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java

@ -136,7 +136,7 @@ public class TestPlugins { @@ -136,7 +136,7 @@ public class TestPlugins {
/**
* A plugin which is incorrectly packaged, which throws an exception from the {@link Versioned#version()} method.
*/
BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR("bad-packaging", "test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER),
BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR("bad-packaging", "test.plugins.VersionMethodThrowsConnector", true, REMOVE_CLASS_FILTER),
/**
* A plugin which is incorrectly packaged, which throws an exception from default constructor.
*/

16
connect/runtime/src/test/resources/test-plugins/aliased-static-field/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.AliasedStaticField

16
connect/runtime/src/test/resources/test-plugins/always-throw-exception/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.AlwaysThrowException

21
connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector

@ -0,0 +1,21 @@ @@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.DefaultConstructorPrivateConnector
test.plugins.DefaultConstructorThrowsConnector
test.plugins.NoDefaultConstructorConnector
test.plugins.StaticInitializerThrowsConnector
test.plugins.OuterClass$InnerClass
test.plugins.VersionMethodThrowsConnector

19
connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,19 @@ @@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.CoLocatedPlugin
test.plugins.MissingSuperclassConverter
test.plugins.NoDefaultConstructorConverter
test.plugins.NoDefaultConstructorConverter

17
connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,17 @@ @@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.ThingOne
test.plugins.ThingTwo

16
connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.ReadVersionFromResource

16
connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.ReadVersionFromResource

16
connect/runtime/src/test/resources/test-plugins/sampling-configurable/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.SamplingConfigurable

16
connect/runtime/src/test/resources/test-plugins/sampling-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.SamplingConnector

16
connect/runtime/src/test/resources/test-plugins/sampling-converter/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.SamplingConverter

16
connect/runtime/src/test/resources/test-plugins/sampling-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.SamplingHeaderConverter

16
connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.ServiceLoaderPlugin

16
connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/META-INF/services/org.apache.kafka.connect.storage.Converter

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.SubclassOfClasspathConverter
Loading…
Cancel
Save