diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java index 5859af32d47..474a92f5cf5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java @@ -28,11 +28,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.ServiceConfigurationError; import java.util.ServiceLoader; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Supplier; /** * Superclass for plugin discovery implementations. @@ -118,35 +120,79 @@ public abstract class PluginScanner { } @SuppressWarnings({"rawtypes", "unchecked"}) - protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { - return new PluginDesc(plugin, version, loader); + protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { + return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") - protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { + protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); - ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); - for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { - try (LoaderSwap loaderSwap = withClassLoader(loader)) { + ServiceLoader serviceLoader = ServiceLoader.load(klass, source.loader()); + Iterator iterator = serviceLoader.iterator(); + while (handleLinkageError(klass, source, iterator::hasNext)) { + try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { - pluginImpl = iterator.next(); + pluginImpl = handleLinkageError(klass, source, iterator::next); } catch (ServiceConfigurationError t) { - log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); + log.error("Failed to discover {} in {}{}", + klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); continue; } Class pluginKlass = (Class) pluginImpl.getClass(); - if (pluginKlass.getClassLoader() != loader) { + if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader); + pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); continue; } - result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader)); + result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source)); } } return result; } + /** + * Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s. + * + * @param klass The plugin superclass which is being loaded + * @param function A function on a {@link ServiceLoader}'s {@link Iterator} which may throw {@link LinkageError} + * @return the return value of function + * @throws Error errors thrown by the passed-in function + * @param Type being iterated over by the ServiceLoader + * @param Return value of the passed-in function + */ + private U handleLinkageError(Class klass, PluginSource source, Supplier function) { + // It's difficult to know for sure if the iterator was able to advance past the first broken + // plugin class, or if it will continue to fail on that broken class for any subsequent calls + // to Iterator::hasNext or Iterator::next + // For reference, see https://bugs.openjdk.org/browse/JDK-8196182, which describes + // the behavior we are trying to mitigate with this logic as buggy, but indicates that a fix + // in the JDK standard library ServiceLoader implementation is unlikely to land + LinkageError lastError = null; + // Try a fixed maximum number of times in case the ServiceLoader cannot move past a faulty plugin, + // but the LinkageError varies between calls. This limit is chosen to be higher than the typical number + // of plugins in a single plugin location, and to limit the amount of log-spam on startup. + for (int i = 0; i < 100; i++) { + try { + return function.get(); + } catch (LinkageError t) { + // As an optimization, hide subsequent error logs if two consecutive errors look similar. + // This reduces log-spam for iterators which cannot advance and rethrow the same exception. + if (lastError == null + || !Objects.equals(lastError.getClass(), t.getClass()) + || !Objects.equals(lastError.getMessage(), t.getMessage())) { + log.error("Failed to discover {} in {}{}", + klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + } + lastError = t; + } + } + log.error("Received excessive ServiceLoader errors: assuming the runtime ServiceLoader implementation cannot " + + "skip faulty implementations. Use a different JRE, or resolve LinkageErrors for plugins in {}", + source.location(), lastError); + throw lastError; + } + protected static String versionFor(T pluginImpl) { try { if (pluginImpl instanceof Versioned) { @@ -169,6 +215,8 @@ public abstract class PluginScanner { return ": Failed to statically initialize plugin class"; } else if (t instanceof InvocationTargetException) { return ": Failed to invoke plugin constructor"; + } else if (t instanceof LinkageError) { + return ": Plugin class has a dependency which is missing or invalid"; } else { return ""; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java index ffedd98d6c3..c7123d83661 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java @@ -18,11 +18,13 @@ package org.apache.kafka.connect.runtime.isolation; import java.net.URL; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Objects; public class PluginSource { + public static final Path CLASSPATH = Paths.get("classpath"); private final Path location; private final ClassLoader loader; private final URL[] urls; @@ -46,7 +48,7 @@ public class PluginSource { } public boolean isolated() { - return loader instanceof PluginClassLoader; + return location != CLASSPATH; } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index 38c07d7b12b..e88bfa1b03d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -353,7 +353,7 @@ public class PluginUtils { List parentUrls = new ArrayList<>(); parentUrls.addAll(ClasspathHelper.forJavaClassPath()); parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent())); - pluginSources.add(new PluginSource(null, classLoader.getParent(), parentUrls.toArray(new URL[0]))); + pluginSources.add(new PluginSource(PluginSource.CLASSPATH, classLoader.getParent(), parentUrls.toArray(new URL[0]))); return pluginSources; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index ace4015136d..e38fefc78c9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -43,6 +43,7 @@ import java.util.TreeSet; *

This implements the legacy discovery strategy, which uses a combination of reflection and service loading in * order to discover plugins. Specifically, a plugin appears in the scan result if all the following conditions are true: *

    + *
  • The class and direct dependencies can be loaded
  • *
  • The class is concrete
  • *
  • The class is public
  • *
  • The class has a no-args constructor
  • @@ -60,7 +61,9 @@ import java.util.TreeSet; * *
*

Note: This scanner has a runtime proportional to the number of overall classes in the passed-in - * {@link PluginSource} objects, which may be significant for plugins with large dependencies. + * {@link PluginSource} objects, which may be significant for plugins with large dependencies. For a more performant + * implementation, consider using {@link ServiceLoaderScanner} and follow migration instructions for + * KIP-898. */ public class ReflectionScanner extends PluginScanner { @@ -73,66 +76,67 @@ public class ReflectionScanner extends PluginScanner { @Override protected PluginScanResult scanPlugins(PluginSource source) { - ClassLoader loader = source.loader(); ConfigurationBuilder builder = new ConfigurationBuilder(); - builder.setClassLoaders(new ClassLoader[]{loader}); + builder.setClassLoaders(new ClassLoader[]{source.loader()}); builder.addUrls(source.urls()); builder.setScanners(Scanners.SubTypes); builder.setParallel(true); Reflections reflections = new Reflections(builder); return new PluginScanResult( - getPluginDesc(reflections, SinkConnector.class, loader), - getPluginDesc(reflections, SourceConnector.class, loader), - getPluginDesc(reflections, Converter.class, loader), - getPluginDesc(reflections, HeaderConverter.class, loader), - getTransformationPluginDesc(loader, reflections), - getPredicatePluginDesc(loader, reflections), - getServiceLoaderPluginDesc(ConfigProvider.class, loader), - getServiceLoaderPluginDesc(ConnectRestExtension.class, loader), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader) + getPluginDesc(reflections, SinkConnector.class, source), + getPluginDesc(reflections, SourceConnector.class, source), + getPluginDesc(reflections, Converter.class, source), + getPluginDesc(reflections, HeaderConverter.class, source), + getTransformationPluginDesc(source, reflections), + getPredicatePluginDesc(source, reflections), + getServiceLoaderPluginDesc(ConfigProvider.class, source), + getServiceLoaderPluginDesc(ConnectRestExtension.class, source), + getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source) ); } @SuppressWarnings({"unchecked"}) - private SortedSet>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) { - return (SortedSet>>) (SortedSet) getPluginDesc(reflections, Predicate.class, loader); + private SortedSet>> getPredicatePluginDesc(PluginSource source, Reflections reflections) { + return (SortedSet>>) (SortedSet) getPluginDesc(reflections, Predicate.class, source); } @SuppressWarnings({"unchecked"}) - private SortedSet>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) { - return (SortedSet>>) (SortedSet) getPluginDesc(reflections, Transformation.class, loader); + private SortedSet>> getTransformationPluginDesc(PluginSource source, Reflections reflections) { + return (SortedSet>>) (SortedSet) getPluginDesc(reflections, Transformation.class, source); } private SortedSet> getPluginDesc( Reflections reflections, Class klass, - ClassLoader loader + PluginSource source ) { Set> plugins; try { plugins = reflections.getSubTypesOf(klass); } catch (ReflectionsException e) { - log.debug("Reflections scanner could not find any classes for URLs: " + - reflections.getConfiguration().getUrls(), e); + log.debug("Reflections scanner could not find any {} in {} for URLs: {}", + klass, source.location(), source.urls(), e); return Collections.emptySortedSet(); } SortedSet> result = new TreeSet<>(); for (Class pluginKlass : plugins) { if (!PluginUtils.isConcrete(pluginKlass)) { - log.debug("Skipping {} as it is not concrete implementation", pluginKlass); + log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source.location()); continue; } - if (pluginKlass.getClassLoader() != loader) { + if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader); + pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); continue; } - try (LoaderSwap loaderSwap = withClassLoader(loader)) { - result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), loader)); + try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { + result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), source)); } catch (ReflectiveOperationException | LinkageError e) { - log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e); + log.error("Failed to discover {} in {}: Unable to instantiate {}{}", + klass.getSimpleName(), source.location(), pluginKlass.getSimpleName(), + reflectiveErrorDescription(e), e); } } return result; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java new file mode 100644 index 00000000000..727a737ff3f --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.rest.ConnectRestExtension; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; + +import java.util.ServiceLoader; +import java.util.SortedSet; + +/** + * A {@link PluginScanner} implementation which uses {@link ServiceLoader} to discover plugins. + *

This implements the modern discovery strategy, which uses only service loading in order to discover plugins. + * Specifically, a plugin appears in the scan result if all the following conditions are true: + *

    + *
  • The class and direct dependencies can be loaded
  • + *
  • The class is concrete
  • + *
  • The class is public
  • + *
  • The class has a no-args constructor
  • + *
  • The no-args constructor is public
  • + *
  • Static initialization of the class completes without throwing an exception
  • + *
  • The no-args constructor completes without throwing an exception
  • + *
  • The class is a subclass of {@link SinkConnector}, {@link SourceConnector}, {@link Converter}, + * {@link HeaderConverter}, {@link Transformation}, {@link Predicate}, {@link ConfigProvider}, + * {@link ConnectRestExtension}, or {@link ConnectorClientConfigOverridePolicy} + *
  • + *
  • The class has a {@link ServiceLoader} compatible manifest file or module declaration
  • + *
+ *

Note: This scanner can only find plugins with corresponding {@link ServiceLoader} manifests, which are + * not required to be packaged with plugins. This has the effect that some plugins discoverable by the + * {@link ReflectionScanner} may not be visible with this implementation. + */ +public class ServiceLoaderScanner extends PluginScanner { + + @Override + protected PluginScanResult scanPlugins(PluginSource source) { + return new PluginScanResult( + getServiceLoaderPluginDesc(SinkConnector.class, source), + getServiceLoaderPluginDesc(SourceConnector.class, source), + getServiceLoaderPluginDesc(Converter.class, source), + getServiceLoaderPluginDesc(HeaderConverter.class, source), + getTransformationPluginDesc(source), + getPredicatePluginDesc(source), + getServiceLoaderPluginDesc(ConfigProvider.class, source), + getServiceLoaderPluginDesc(ConnectRestExtension.class, source), + getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source) + ); + } + + @SuppressWarnings({"unchecked"}) + private SortedSet>> getPredicatePluginDesc(PluginSource source) { + return (SortedSet>>) (SortedSet) getServiceLoaderPluginDesc(Predicate.class, source); + } + + @SuppressWarnings({"unchecked"}) + private SortedSet>> getTransformationPluginDesc(PluginSource source) { + return (SortedSet>>) (SortedSet) getServiceLoaderPluginDesc(Transformation.class, source); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java index 28e3a4a56f6..b9b80fa5110 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java @@ -20,79 +20,113 @@ package org.apache.kafka.connect.runtime.isolation; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PluginScannerTest { + private enum ScannerType { Reflection, ServiceLoader }; + @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); + private final PluginScanner scanner; + + @Parameterized.Parameters + public static Collection parameters() { + List values = new ArrayList<>(); + for (ScannerType type : ScannerType.values()) { + values.add(new Object[]{type}); + } + return values; + } + + public PluginScannerTest(ScannerType scannerType) { + switch (scannerType) { + case Reflection: + this.scanner = new ReflectionScanner(); + break; + case ServiceLoader: + this.scanner = new ServiceLoaderScanner(); + break; + default: + throw new IllegalArgumentException("Unknown type " + scannerType); + } + } + @Test - public void testLoadingUnloadedPluginClass() { - DelegatingClassLoader classLoader = initClassLoader( + public void testScanningEmptyPluginPath() { + PluginScanResult result = scan( Collections.emptyList() ); - for (String pluginClassName : TestPlugins.pluginClasses()) { - assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); - } + assertTrue(result.isEmpty()); } @Test - public void testLoadingPluginClass() throws ClassNotFoundException { - DelegatingClassLoader classLoader = initClassLoader( + public void testScanningPluginClasses() { + PluginScanResult result = scan( TestPlugins.pluginPath() ); - for (String pluginClassName : TestPlugins.pluginClasses()) { - assertNotNull(classLoader.loadClass(pluginClassName)); - assertNotNull(classLoader.pluginClassLoader(pluginClassName)); - } + Set classes = new HashSet<>(); + result.forEach(pluginDesc -> classes.add(pluginDesc.className())); + Set expectedClasses = new HashSet<>(TestPlugins.pluginClasses()); + assertEquals(expectedClasses, classes); } @Test - public void testLoadingInvalidUberJar() throws Exception { + public void testScanningInvalidUberJar() throws Exception { pluginDir.newFile("invalid.jar"); - initClassLoader( + PluginScanResult result = scan( Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) ); + assertTrue(result.isEmpty()); } @Test - public void testLoadingPluginDirContainsInvalidJarsOnly() throws Exception { + public void testScanningPluginDirContainsInvalidJarsOnly() throws Exception { pluginDir.newFolder("my-plugin"); pluginDir.newFile("my-plugin/invalid.jar"); - initClassLoader( + PluginScanResult result = scan( Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) ); + assertTrue(result.isEmpty()); } @Test - public void testLoadingNoPlugins() { - initClassLoader( + public void testScanningNoPlugins() { + PluginScanResult result = scan( Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) ); + assertTrue(result.isEmpty()); } @Test - public void testLoadingPluginDirEmpty() throws Exception { + public void testScanningPluginDirEmpty() throws Exception { pluginDir.newFolder("my-plugin"); - initClassLoader( + PluginScanResult result = scan( Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) ); + assertTrue(result.isEmpty()); } @Test - public void testLoadingMixOfValidAndInvalidPlugins() throws Exception { + public void testScanningMixOfValidAndInvalidPlugins() throws Exception { pluginDir.newFile("invalid.jar"); pluginDir.newFolder("my-plugin"); pluginDir.newFile("my-plugin/invalid.jar"); @@ -102,22 +136,19 @@ public class PluginScannerTest { Files.copy(source, pluginPath.resolve(source.getFileName())); } - DelegatingClassLoader classLoader = initClassLoader( + PluginScanResult result = scan( Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) ); - for (String pluginClassName : TestPlugins.pluginClasses()) { - assertNotNull(classLoader.loadClass(pluginClassName)); - assertNotNull(classLoader.pluginClassLoader(pluginClassName)); - } + Set classes = new HashSet<>(); + result.forEach(pluginDesc -> classes.add(pluginDesc.className())); + Set expectedClasses = new HashSet<>(TestPlugins.pluginClasses()); + assertEquals(expectedClasses, classes); } - private DelegatingClassLoader initClassLoader(List pluginLocations) { + private PluginScanResult scan(List pluginLocations) { ClassLoaderFactory factory = new ClassLoaderFactory(); - DelegatingClassLoader classLoader = factory.newDelegatingClassLoader(DelegatingClassLoader.class.getClassLoader()); - Set pluginSources = PluginUtils.pluginSources(pluginLocations, classLoader, factory); - PluginScanResult scanResult = new ReflectionScanner().discoverPlugins(pluginSources); - classLoader.installDiscoveredPlugins(scanResult); - return classLoader; + Set pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory); + return scanner.discoverPlugins(pluginSources); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index d821185d85d..bd87dd41516 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -136,7 +136,7 @@ public class TestPlugins { /** * A plugin which is incorrectly packaged, which throws an exception from the {@link Versioned#version()} method. */ - BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR("bad-packaging", "test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER), + BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR("bad-packaging", "test.plugins.VersionMethodThrowsConnector", true, REMOVE_CLASS_FILTER), /** * A plugin which is incorrectly packaged, which throws an exception from default constructor. */ diff --git a/connect/runtime/src/test/resources/test-plugins/aliased-static-field/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..1cdad430edd --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.AliasedStaticField diff --git a/connect/runtime/src/test/resources/test-plugins/always-throw-exception/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..084c96f96ca --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.AlwaysThrowException diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 00000000000..afd40b0d63f --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,21 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.DefaultConstructorPrivateConnector +test.plugins.DefaultConstructorThrowsConnector +test.plugins.NoDefaultConstructorConnector +test.plugins.StaticInitializerThrowsConnector +test.plugins.OuterClass$InnerClass +test.plugins.VersionMethodThrowsConnector diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..38da8f33767 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,19 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.CoLocatedPlugin +test.plugins.MissingSuperclassConverter +test.plugins.NoDefaultConstructorConverter +test.plugins.NoDefaultConstructorConverter diff --git a/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..ff148703cbe --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.ThingOne +test.plugins.ThingTwo diff --git a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..89b0af611e9 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.ReadVersionFromResource diff --git a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..89b0af611e9 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.ReadVersionFromResource diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-configurable/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/sampling-configurable/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..232b881a39f --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/sampling-configurable/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.SamplingConfigurable diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/runtime/src/test/resources/test-plugins/sampling-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 00000000000..e83aba248ab --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/sampling-connector/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.SamplingConnector diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-converter/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/sampling-converter/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..00ece8187bf --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/sampling-converter/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.SamplingConverter diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 00000000000..66291d24c66 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.SamplingHeaderConverter diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..3dfaea697b3 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.ServiceLoaderPlugin diff --git a/connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..418027308ec --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.SubclassOfClasspathConverter