Browse Source

KAFKA-15244: Remove PluginType.from(Class) (#14089)

Reviewers: Chris Egerton <chrise@aiven.io>
pull/13417/merge
Greg Harris 1 year ago committed by GitHub
parent
commit
b9a45546a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
  2. 14
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
  3. 26
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
  4. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
  5. 25
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
  6. 16
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
  7. 11
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  8. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
  9. 33
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
  10. 18
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java
  11. 5
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
  12. 3
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
  13. 128
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
  14. 14
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
  15. 18
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
  16. 7
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
  17. 93
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java

41
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java

@ -33,7 +33,6 @@ import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest; @@ -33,7 +33,6 @@ import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
@ -842,34 +841,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con @@ -842,34 +841,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader())) {
Object plugin = p.newPlugin(pluginName);
PluginType pluginType = PluginType.from(plugin.getClass());
// Contains definitions coming from Connect framework
ConfigDef baseConfigDefs = null;
// Contains definitions specifically declared on the plugin
ConfigDef pluginConfigDefs;
switch (pluginType) {
case SINK:
baseConfigDefs = SinkConnectorConfig.configDef();
pluginConfigDefs = ((SinkConnector) plugin).config();
break;
case SOURCE:
baseConfigDefs = SourceConnectorConfig.configDef();
pluginConfigDefs = ((SourceConnector) plugin).config();
break;
case CONVERTER:
pluginConfigDefs = ((Converter) plugin).config();
break;
case HEADER_CONVERTER:
pluginConfigDefs = ((HeaderConverter) plugin).config();
break;
case TRANSFORMATION:
pluginConfigDefs = ((Transformation<?>) plugin).config();
break;
case PREDICATE:
pluginConfigDefs = ((Predicate<?>) plugin).config();
break;
default:
throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate.");
if (plugin instanceof SinkConnector) {
baseConfigDefs = SinkConnectorConfig.configDef();
pluginConfigDefs = ((SinkConnector) plugin).config();
} else if (plugin instanceof SourceConnector) {
baseConfigDefs = SourceConnectorConfig.configDef();
pluginConfigDefs = ((SourceConnector) plugin).config();
} else if (plugin instanceof Converter) {
pluginConfigDefs = ((Converter) plugin).config();
} else if (plugin instanceof HeaderConverter) {
pluginConfigDefs = ((HeaderConverter) plugin).config();
} else if (plugin instanceof Transformation) {
pluginConfigDefs = ((Transformation<?>) plugin).config();
} else if (plugin instanceof Predicate) {
pluginConfigDefs = ((Predicate<?>) plugin).config();
} else {
throw new BadRequestException("Invalid plugin class " + pluginName + ". Valid types are sink, source, converter, header_converter, transformation, predicate.");
}
// Track config properties by name and, if the same property is defined in multiple places,

14
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java

@ -55,19 +55,7 @@ public class PluginClassLoader extends URLClassLoader { @@ -55,19 +55,7 @@ public class PluginClassLoader extends URLClassLoader {
*/
public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
super(urls, parent);
this.pluginLocation = pluginLocation;
}
/**
* Constructor that defines the system classloader as parent of this plugin classloader.
*
* @param pluginLocation the top-level location of the plugin to be loaded in isolation by this
* classloader.
* @param urls the list of urls from which to load classes and resources for this plugin.
*/
public PluginClassLoader(URL pluginLocation, URL[] urls) {
super(urls);
this.pluginLocation = pluginLocation;
this.pluginLocation = Objects.requireNonNull(pluginLocation, "Plugin location must be non-null");
}
/**

26
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java

@ -21,7 +21,7 @@ import org.apache.maven.artifact.versioning.DefaultArtifactVersion; @@ -21,7 +21,7 @@ import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import java.util.Objects;
public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
public class PluginDesc<T> implements Comparable<PluginDesc<?>> {
public static final String UNDEFINED_VERSION = "undefined";
private final Class<? extends T> klass;
private final String name;
@ -32,15 +32,16 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> { @@ -32,15 +32,16 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
private final String location;
private final ClassLoader loader;
public PluginDesc(Class<? extends T> klass, String version, ClassLoader loader) {
this.klass = klass;
this.name = klass.getName();
public PluginDesc(Class<? extends T> klass, String version, PluginType type, ClassLoader loader) {
this.klass = Objects.requireNonNull(klass, "Plugin class must be non-null");
this.name = this.klass.getName();
this.version = version != null ? version : "null";
this.encodedVersion = new DefaultArtifactVersion(this.version);
this.type = PluginType.from(klass);
this.typeName = type.toString();
this.type = Objects.requireNonNull(type, "Plugin type must be non-null");
this.typeName = this.type.toString();
Objects.requireNonNull(loader, "Plugin classloader must be non-null");
this.location = loader instanceof PluginClassLoader
? ((PluginClassLoader) loader).location()
? Objects.requireNonNull(((PluginClassLoader) loader).location(), "Plugin location must be non-null")
: "classpath";
this.loader = loader;
}
@ -110,11 +111,18 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> { @@ -110,11 +111,18 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
}
@Override
public int compareTo(PluginDesc<T> other) {
public int compareTo(PluginDesc<?> other) {
int nameComp = name.compareTo(other.name);
int versionComp = encodedVersion.compareTo(other.encodedVersion);
// isolated plugins appear after classpath plugins when they have identical versions.
int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader);
return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp);
// choose an arbitrary order between different locations and types
int loaderComp = location.compareTo(other.location);
int typeComp = type.compareTo(other.type);
return nameComp != 0 ? nameComp :
versionComp != 0 ? versionComp :
isolatedComp != 0 ? isolatedComp :
loaderComp != 0 ? loaderComp :
typeComp;
}
}

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java

@ -88,7 +88,7 @@ public class PluginScanResult { @@ -88,7 +88,7 @@ public class PluginScanResult {
);
}
private static <R extends Comparable<R>> SortedSet<R> merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> accessor) {
private static <R extends Comparable<?>> SortedSet<R> merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> accessor) {
SortedSet<R> merged = new TreeSet<>();
for (PluginScanResult element : results) {
merged.addAll(accessor.apply(element));

25
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java

@ -120,32 +120,32 @@ public abstract class PluginScanner { @@ -120,32 +120,32 @@ public abstract class PluginScanner {
}
@SuppressWarnings({"rawtypes", "unchecked"})
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginSource source) {
return new PluginDesc(plugin, version, source.loader());
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginType type, PluginSource source) {
return new PluginDesc(plugin, version, type, source.loader());
}
@SuppressWarnings("unchecked")
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, PluginSource source) {
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(PluginType type, PluginSource source) {
SortedSet<PluginDesc<T>> result = new TreeSet<>();
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, source.loader());
ServiceLoader<T> serviceLoader = ServiceLoader.load((Class<T>) type.superClass(), source.loader());
Iterator<T> iterator = serviceLoader.iterator();
while (handleLinkageError(klass, source, iterator::hasNext)) {
while (handleLinkageError(type, source, iterator::hasNext)) {
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
T pluginImpl;
try {
pluginImpl = handleLinkageError(klass, source, iterator::next);
pluginImpl = handleLinkageError(type, source, iterator::next);
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {} in {}{}",
klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
continue;
}
Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass();
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location());
type.simpleName(), pluginKlass.getClassLoader(), source.location());
continue;
}
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source));
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), type, source));
}
}
return result;
@ -154,14 +154,13 @@ public abstract class PluginScanner { @@ -154,14 +154,13 @@ public abstract class PluginScanner {
/**
* Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s.
*
* @param klass The plugin superclass which is being loaded
* @param type The plugin type which is being loaded
* @param function A function on a {@link ServiceLoader}'s {@link Iterator} which may throw {@link LinkageError}
* @return the return value of function
* @throws Error errors thrown by the passed-in function
* @param <T> Type being iterated over by the ServiceLoader
* @param <U> Return value of the passed-in function
*/
private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, Supplier<U> function) {
private <U> U handleLinkageError(PluginType type, PluginSource source, Supplier<U> function) {
// It's difficult to know for sure if the iterator was able to advance past the first broken
// plugin class, or if it will continue to fail on that broken class for any subsequent calls
// to Iterator::hasNext or Iterator::next
@ -182,7 +181,7 @@ public abstract class PluginScanner { @@ -182,7 +181,7 @@ public abstract class PluginScanner {
|| !Objects.equals(lastError.getClass(), t.getClass())
|| !Objects.equals(lastError.getMessage(), t.getMessage())) {
log.error("Failed to discover {} in {}{}",
klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
}
lastError = t;
}

16
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java

@ -37,8 +37,7 @@ public enum PluginType { @@ -37,8 +37,7 @@ public enum PluginType {
PREDICATE(Predicate.class),
CONFIGPROVIDER(ConfigProvider.class),
REST_EXTENSION(ConnectRestExtension.class),
CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class),
UNKNOWN(Object.class);
CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class);
private final Class<?> klass;
@ -46,19 +45,14 @@ public enum PluginType { @@ -46,19 +45,14 @@ public enum PluginType {
this.klass = klass;
}
public static PluginType from(Class<?> klass) {
for (PluginType type : PluginType.values()) {
if (type.klass.isAssignableFrom(klass)) {
return type;
}
}
return UNKNOWN;
}
public String simpleName() {
return klass.getSimpleName();
}
public Class<?> superClass() {
return klass;
}
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);

11
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java

@ -35,6 +35,7 @@ import java.util.Collections; @@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
@ -196,12 +197,12 @@ public class PluginUtils { @@ -196,12 +197,12 @@ public class PluginUtils {
return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
}
public static List<Path> pluginLocations(String pluginPath) {
public static Set<Path> pluginLocations(String pluginPath) {
if (pluginPath == null) {
return Collections.emptyList();
return Collections.emptySet();
}
String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
List<Path> pluginLocations = new ArrayList<>();
Set<Path> pluginLocations = new LinkedHashSet<>();
for (String path : pluginPathElements) {
try {
Path pluginPathElement = Paths.get(path).toAbsolutePath();
@ -328,8 +329,8 @@ public class PluginUtils { @@ -328,8 +329,8 @@ public class PluginUtils {
return Arrays.asList(archives.toArray(new Path[0]));
}
public static Set<PluginSource> pluginSources(List<Path> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) {
Set<PluginSource> pluginSources = new HashSet<>();
public static Set<PluginSource> pluginSources(Set<Path> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) {
Set<PluginSource> pluginSources = new LinkedHashSet<>();
for (Path pluginLocation : pluginLocations) {
try {

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java

@ -63,7 +63,7 @@ public class Plugins { @@ -63,7 +63,7 @@ public class Plugins {
// VisibleForTesting
Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) {
String pluginPath = WorkerConfig.pluginPath(props);
List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
delegatingLoader = factory.newDelegatingClassLoader(parent);
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
scanResult = initLoaders(pluginSources);

33
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java

@ -69,7 +69,7 @@ public class ReflectionScanner extends PluginScanner { @@ -69,7 +69,7 @@ public class ReflectionScanner extends PluginScanner {
private static final Logger log = LoggerFactory.getLogger(ReflectionScanner.class);
public static <T> String versionFor(Class<? extends T> pluginKlass) throws ReflectiveOperationException {
private static <T> String versionFor(Class<? extends T> pluginKlass) throws ReflectiveOperationException {
T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
return versionFor(pluginImpl);
}
@ -84,39 +84,40 @@ public class ReflectionScanner extends PluginScanner { @@ -84,39 +84,40 @@ public class ReflectionScanner extends PluginScanner {
Reflections reflections = new Reflections(builder);
return new PluginScanResult(
getPluginDesc(reflections, SinkConnector.class, source),
getPluginDesc(reflections, SourceConnector.class, source),
getPluginDesc(reflections, Converter.class, source),
getPluginDesc(reflections, HeaderConverter.class, source),
getPluginDesc(reflections, PluginType.SINK, source),
getPluginDesc(reflections, PluginType.SOURCE, source),
getPluginDesc(reflections, PluginType.CONVERTER, source),
getPluginDesc(reflections, PluginType.HEADER_CONVERTER, source),
getTransformationPluginDesc(source, reflections),
getPredicatePluginDesc(source, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, source),
getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source)
);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, source);
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, PluginType.PREDICATE, source);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, source);
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, PluginType.TRANSFORMATION, source);
}
@SuppressWarnings({"unchecked"})
private <T> SortedSet<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
PluginType type,
PluginSource source
) {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
plugins = reflections.getSubTypesOf((Class<T>) type.superClass());
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any {} in {} for URLs: {}",
klass, source.location(), source.urls(), e);
type, source.location(), source.urls(), e);
return Collections.emptySortedSet();
}
@ -128,14 +129,14 @@ public class ReflectionScanner extends PluginScanner { @@ -128,14 +129,14 @@ public class ReflectionScanner extends PluginScanner {
}
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location());
pluginKlass, pluginKlass.getClassLoader(), source.location());
continue;
}
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), source));
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), type, source));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {} in {}: Unable to instantiate {}{}",
klass.getSimpleName(), source.location(), pluginKlass.getSimpleName(),
type.simpleName(), source.location(), pluginKlass.getSimpleName(),
reflectiveErrorDescription(e), e);
}
}

18
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java

@ -56,25 +56,25 @@ public class ServiceLoaderScanner extends PluginScanner { @@ -56,25 +56,25 @@ public class ServiceLoaderScanner extends PluginScanner {
@Override
protected PluginScanResult scanPlugins(PluginSource source) {
return new PluginScanResult(
getServiceLoaderPluginDesc(SinkConnector.class, source),
getServiceLoaderPluginDesc(SourceConnector.class, source),
getServiceLoaderPluginDesc(Converter.class, source),
getServiceLoaderPluginDesc(HeaderConverter.class, source),
getServiceLoaderPluginDesc(PluginType.SINK, source),
getServiceLoaderPluginDesc(PluginType.SOURCE, source),
getServiceLoaderPluginDesc(PluginType.CONVERTER, source),
getServiceLoaderPluginDesc(PluginType.HEADER_CONVERTER, source),
getTransformationPluginDesc(source),
getPredicatePluginDesc(source),
getServiceLoaderPluginDesc(ConfigProvider.class, source),
getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source)
);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Predicate.class, source);
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(PluginType.PREDICATE, source);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Transformation.class, source);
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(PluginType.TRANSFORMATION, source);
}
}

5
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java

@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.NotFoundException; @@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@ -667,12 +668,12 @@ public class AbstractHerderTest { @@ -667,12 +668,12 @@ public class AbstractHerderTest {
@SuppressWarnings({"rawtypes", "unchecked"})
private PluginDesc<Predicate<?>> predicatePluginDesc() {
return new PluginDesc(SamplePredicate.class, "1.0", classLoader);
return new PluginDesc(SamplePredicate.class, "1.0", PluginType.PREDICATE, classLoader);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private PluginDesc<Transformation<?>> transformationPluginDesc() {
return new PluginDesc(SampleTransformation.class, "1.0", classLoader);
return new PluginDesc(SampleTransformation.class, "1.0", PluginType.TRANSFORMATION, classLoader);
}
@Test

3
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java

@ -61,7 +61,8 @@ public class DelegatingClassLoaderTest { @@ -61,7 +61,8 @@ public class DelegatingClassLoaderTest {
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
// Lie to the DCL that this arbitrary class is a connector, since all real connector classes we have access to
// are forced to be non-isolated by PluginUtils.shouldLoadInIsolation.
pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) ARBITRARY_CLASS, null, pluginLoader);
when(pluginLoader.location()).thenReturn("some-location");
pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) ARBITRARY_CLASS, null, PluginType.SINK, pluginLoader);
assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className()));
sinkConnectors.add(pluginDesc);
scanResult = new PluginScanResult(

128
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java

@ -17,10 +17,13 @@ @@ -17,10 +17,13 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.junit.Before;
@ -31,7 +34,10 @@ import java.nio.file.Paths; @@ -31,7 +34,10 @@ import java.nio.file.Paths;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PluginDescTest {
private final ClassLoader systemLoader = ClassLoader.getSystemClassLoader();
@ -40,41 +46,47 @@ public class PluginDescTest { @@ -40,41 +46,47 @@ public class PluginDescTest {
private final String snapshotVersion = "1.0.0-SNAPSHOT";
private final String noVersion = "undefined";
private PluginClassLoader pluginLoader;
private PluginClassLoader otherPluginLoader;
@Before
public void setUp() throws Exception {
// Fairly simple use case, thus no need to create a random directory here yet.
URL location = Paths.get("/tmp").toUri().toURL();
URL otherLocation = Paths.get("/tmp-other").toUri().toURL();
// Normally parent will be a DelegatingClassLoader.
pluginLoader = new PluginClassLoader(location, new URL[0], systemLoader);
otherPluginLoader = new PluginClassLoader(otherLocation, new URL[0], systemLoader);
}
@SuppressWarnings("rawtypes")
@Test
public void testRegularPluginDesc() {
PluginDesc<Connector> connectorDesc = new PluginDesc<>(
Connector.class,
PluginDesc<SinkConnector> connectorDesc = new PluginDesc<>(
SinkConnector.class,
regularVersion,
PluginType.SINK,
pluginLoader
);
assertPluginDesc(connectorDesc, Connector.class, regularVersion, pluginLoader.location());
assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, PluginType.SINK, pluginLoader.location());
PluginDesc<Converter> converterDesc = new PluginDesc<>(
Converter.class,
snapshotVersion,
PluginType.CONVERTER,
pluginLoader
);
assertPluginDesc(converterDesc, Converter.class, snapshotVersion, pluginLoader.location());
assertPluginDesc(converterDesc, Converter.class, snapshotVersion, PluginType.CONVERTER, pluginLoader.location());
PluginDesc<Transformation> transformDesc = new PluginDesc<>(
Transformation.class,
noVersion,
PluginType.TRANSFORMATION,
pluginLoader
);
assertPluginDesc(transformDesc, Transformation.class, noVersion, pluginLoader.location());
assertPluginDesc(transformDesc, Transformation.class, noVersion, PluginType.TRANSFORMATION, pluginLoader.location());
}
@SuppressWarnings("rawtypes")
@ -84,26 +96,29 @@ public class PluginDescTest { @@ -84,26 +96,29 @@ public class PluginDescTest {
PluginDesc<SinkConnector> connectorDesc = new PluginDesc<>(
SinkConnector.class,
regularVersion,
PluginType.SINK,
systemLoader
);
assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, location);
assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, PluginType.SINK, location);
PluginDesc<Converter> converterDesc = new PluginDesc<>(
Converter.class,
snapshotVersion,
PluginType.CONVERTER,
systemLoader
);
assertPluginDesc(converterDesc, Converter.class, snapshotVersion, location);
assertPluginDesc(converterDesc, Converter.class, snapshotVersion, PluginType.CONVERTER, location);
PluginDesc<Transformation> transformDesc = new PluginDesc<>(
Transformation.class,
noVersion,
PluginType.TRANSFORMATION,
systemLoader
);
assertPluginDesc(transformDesc, Transformation.class, noVersion, location);
assertPluginDesc(transformDesc, Transformation.class, noVersion, PluginType.TRANSFORMATION, location);
}
@Test
@ -112,6 +127,7 @@ public class PluginDescTest { @@ -112,6 +127,7 @@ public class PluginDescTest {
PluginDesc<SourceConnector> connectorDesc = new PluginDesc<>(
SourceConnector.class,
null,
PluginType.SOURCE,
pluginLoader
);
@ -119,6 +135,7 @@ public class PluginDescTest { @@ -119,6 +135,7 @@ public class PluginDescTest {
connectorDesc,
SourceConnector.class,
nullVersion,
PluginType.SOURCE,
pluginLoader.location()
);
@ -126,24 +143,27 @@ public class PluginDescTest { @@ -126,24 +143,27 @@ public class PluginDescTest {
PluginDesc<Converter> converterDesc = new PluginDesc<>(
Converter.class,
null,
PluginType.CONVERTER,
systemLoader
);
assertPluginDesc(converterDesc, Converter.class, nullVersion, location);
assertPluginDesc(converterDesc, Converter.class, nullVersion, PluginType.CONVERTER, location);
}
@SuppressWarnings("rawtypes")
@Test
public void testPluginDescEquality() {
PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
Connector.class,
PluginDesc<SinkConnector> connectorDescPluginPath = new PluginDesc<>(
SinkConnector.class,
snapshotVersion,
PluginType.SINK,
pluginLoader
);
PluginDesc<Connector> connectorDescClasspath = new PluginDesc<>(
Connector.class,
PluginDesc<SinkConnector> connectorDescClasspath = new PluginDesc<>(
SinkConnector.class,
snapshotVersion,
PluginType.SINK,
systemLoader
);
@ -153,12 +173,14 @@ public class PluginDescTest { @@ -153,12 +173,14 @@ public class PluginDescTest {
PluginDesc<Converter> converterDescPluginPath = new PluginDesc<>(
Converter.class,
noVersion,
PluginType.CONVERTER,
pluginLoader
);
PluginDesc<Converter> converterDescClasspath = new PluginDesc<>(
Converter.class,
noVersion,
PluginType.CONVERTER,
systemLoader
);
@ -168,30 +190,34 @@ public class PluginDescTest { @@ -168,30 +190,34 @@ public class PluginDescTest {
PluginDesc<Transformation> transformDescPluginPath = new PluginDesc<>(
Transformation.class,
null,
PluginType.TRANSFORMATION,
pluginLoader
);
PluginDesc<Transformation> transformDescClasspath = new PluginDesc<>(
Transformation.class,
noVersion,
PluginType.TRANSFORMATION,
pluginLoader
);
assertNotEquals(transformDescPluginPath, transformDescClasspath);
}
@SuppressWarnings("rawtypes")
@SuppressWarnings({"rawtypes", "unchecked"})
@Test
public void testPluginDescComparison() {
PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
Connector.class,
PluginDesc<SinkConnector> connectorDescPluginPath = new PluginDesc<>(
SinkConnector.class,
regularVersion,
PluginType.SINK,
pluginLoader
);
PluginDesc<Connector> connectorDescClasspath = new PluginDesc<>(
Connector.class,
PluginDesc<SinkConnector> connectorDescClasspath = new PluginDesc<>(
SinkConnector.class,
newerVersion,
PluginType.SINK,
systemLoader
);
@ -200,12 +226,14 @@ public class PluginDescTest { @@ -200,12 +226,14 @@ public class PluginDescTest {
PluginDesc<Converter> converterDescPluginPath = new PluginDesc<>(
Converter.class,
noVersion,
PluginType.CONVERTER,
pluginLoader
);
PluginDesc<Converter> converterDescClasspath = new PluginDesc<>(
Converter.class,
snapshotVersion,
PluginType.CONVERTER,
systemLoader
);
@ -214,12 +242,14 @@ public class PluginDescTest { @@ -214,12 +242,14 @@ public class PluginDescTest {
PluginDesc<Transformation> transformDescPluginPath = new PluginDesc<>(
Transformation.class,
null,
PluginType.TRANSFORMATION,
pluginLoader
);
PluginDesc<Transformation> transformDescClasspath = new PluginDesc<>(
Transformation.class,
regularVersion,
PluginType.TRANSFORMATION,
systemLoader
);
@ -228,33 +258,87 @@ public class PluginDescTest { @@ -228,33 +258,87 @@ public class PluginDescTest {
PluginDesc<Predicate> predicateDescPluginPath = new PluginDesc<>(
Predicate.class,
regularVersion,
PluginType.PREDICATE,
pluginLoader
);
PluginDesc<Predicate> predicateDescClasspath = new PluginDesc<>(
Predicate.class,
regularVersion,
PluginType.PREDICATE,
systemLoader
);
assertNewer(predicateDescPluginPath, predicateDescClasspath);
PluginDesc<ConfigProvider> configProviderDescPluginPath = new PluginDesc<>(
FileConfigProvider.class,
regularVersion,
PluginType.CONFIGPROVIDER,
pluginLoader
);
PluginDesc<ConfigProvider> configProviderDescOtherPluginLoader = new PluginDesc<>(
FileConfigProvider.class,
regularVersion,
PluginType.CONFIGPROVIDER,
otherPluginLoader
);
assertTrue("Different plugin loaders should have an ordering",
configProviderDescPluginPath.compareTo(configProviderDescOtherPluginLoader) != 0);
PluginDesc<Converter> jsonConverterPlugin = new PluginDesc<>(
JsonConverter.class,
regularVersion,
PluginType.CONVERTER,
systemLoader
);
PluginDesc<HeaderConverter> jsonHeaderConverterPlugin = new PluginDesc<>(
JsonConverter.class,
regularVersion,
PluginType.HEADER_CONVERTER,
systemLoader
);
assertNewer(jsonConverterPlugin, jsonHeaderConverterPlugin);
}
@Test
public void testNullArguments() {
// Null version is acceptable
PluginDesc<SinkConnector> sink = new PluginDesc<>(SinkConnector.class, null, PluginType.SINK, systemLoader);
assertEquals("null", sink.version());
// Direct nulls are not acceptable for other arguments
assertThrows(NullPointerException.class, () -> new PluginDesc<>(null, regularVersion, PluginType.SINK, systemLoader));
assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, null, systemLoader));
assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, PluginType.SINK, null));
// PluginClassLoaders must have non-null locations
PluginClassLoader nullLocationLoader = mock(PluginClassLoader.class);
when(nullLocationLoader.location()).thenReturn(null);
assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, PluginType.SINK, nullLocationLoader));
}
private static <T> void assertPluginDesc(
PluginDesc<T> desc,
Class<? extends T> klass,
String version,
PluginType type,
String location
) {
assertEquals(desc.pluginClass(), klass);
assertEquals(desc.className(), klass.getName());
assertEquals(desc.version(), version);
assertEquals(desc.type(), PluginType.from(klass));
assertEquals(desc.typeName(), PluginType.from(klass).toString());
assertEquals(desc.type(), type);
assertEquals(desc.typeName(), type.toString());
assertEquals(desc.location(), location);
}
private static <T> void assertNewer(PluginDesc<T> older, PluginDesc<T> newer) {
private static void assertNewer(PluginDesc<?> older, PluginDesc<?> newer) {
assertTrue(newer + " should be newer than " + older, older.compareTo(newer) < 0);
}
}

14
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java

@ -70,7 +70,7 @@ public class PluginScannerTest { @@ -70,7 +70,7 @@ public class PluginScannerTest {
@Test
public void testScanningEmptyPluginPath() {
PluginScanResult result = scan(
Collections.emptyList()
Collections.emptySet()
);
assertTrue(result.isEmpty());
}
@ -91,7 +91,7 @@ public class PluginScannerTest { @@ -91,7 +91,7 @@ public class PluginScannerTest {
pluginDir.newFile("invalid.jar");
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}
@ -102,7 +102,7 @@ public class PluginScannerTest { @@ -102,7 +102,7 @@ public class PluginScannerTest {
pluginDir.newFile("my-plugin/invalid.jar");
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}
@ -110,7 +110,7 @@ public class PluginScannerTest { @@ -110,7 +110,7 @@ public class PluginScannerTest {
@Test
public void testScanningNoPlugins() {
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}
@ -120,7 +120,7 @@ public class PluginScannerTest { @@ -120,7 +120,7 @@ public class PluginScannerTest {
pluginDir.newFolder("my-plugin");
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}
@ -137,7 +137,7 @@ public class PluginScannerTest { @@ -137,7 +137,7 @@ public class PluginScannerTest {
}
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath())
);
Set<String> classes = new HashSet<>();
result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
@ -145,7 +145,7 @@ public class PluginScannerTest { @@ -145,7 +145,7 @@ public class PluginScannerTest {
assertEquals(expectedClasses, classes);
}
private PluginScanResult scan(List<Path> pluginLocations) {
private PluginScanResult scan(Set<Path> pluginLocations) {
ClassLoaderFactory factory = new ClassLoaderFactory();
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory);
return scanner.discoverPlugins(pluginSources);

18
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java

@ -508,11 +508,11 @@ public class PluginUtilsTest { @@ -508,11 +508,11 @@ public class PluginUtilsTest {
@Test
public void testNonCollidingAliases() {
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader()));
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, PluginType.SINK, MockSinkConnector.class.getClassLoader()));
SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>();
sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader()));
sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, PluginType.SOURCE, MockSourceConnector.class.getClassLoader()));
SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader()));
converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader()));
PluginScanResult result = new PluginScanResult(
sinkConnectors,
sourceConnectors,
@ -540,8 +540,8 @@ public class PluginUtilsTest { @@ -540,8 +540,8 @@ public class PluginUtilsTest {
public void testMultiVersionAlias() {
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
// distinct versions don't cause an alias collision (the class name is the same)
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader()));
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader()));
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, PluginType.SINK, MockSinkConnector.class.getClassLoader()));
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", PluginType.SINK, MockSinkConnector.class.getClassLoader()));
assertEquals(2, sinkConnectors.size());
PluginScanResult result = new PluginScanResult(
sinkConnectors,
@ -564,9 +564,9 @@ public class PluginUtilsTest { @@ -564,9 +564,9 @@ public class PluginUtilsTest {
@Test
public void testCollidingPrunedAlias() {
SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader()));
converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader()));
SortedSet<PluginDesc<HeaderConverter>> headerConverters = new TreeSet<>();
headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader()));
headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, PluginType.HEADER_CONVERTER, CollidingHeaderConverter.class.getClassLoader()));
PluginScanResult result = new PluginScanResult(
Collections.emptySortedSet(),
Collections.emptySortedSet(),
@ -589,9 +589,9 @@ public class PluginUtilsTest { @@ -589,9 +589,9 @@ public class PluginUtilsTest {
@Test
public void testCollidingSimpleAlias() {
SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader()));
converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader()));
SortedSet<PluginDesc<Transformation<?>>> transformations = new TreeSet<>();
transformations.add(new PluginDesc<>((Class<? extends Transformation<?>>) (Class<?>) Colliding.class, null, Colliding.class.getClassLoader()));
transformations.add(new PluginDesc<>((Class<? extends Transformation<?>>) (Class<?>) Colliding.class, null, PluginType.TRANSFORMATION, Colliding.class.getClassLoader()));
PluginScanResult result = new PluginScanResult(
Collections.emptySortedSet(),
Collections.emptySortedSet(),

7
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java

@ -34,6 +34,7 @@ import java.util.HashMap; @@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
@ -248,7 +249,7 @@ public class TestPlugins { @@ -248,7 +249,7 @@ public class TestPlugins {
* @return A list of plugin jar filenames
* @throws AssertionError if any plugin failed to load, or no plugins were loaded.
*/
public static List<Path> pluginPath() {
public static Set<Path> pluginPath() {
return pluginPath(defaultPlugins());
}
@ -262,14 +263,14 @@ public class TestPlugins { @@ -262,14 +263,14 @@ public class TestPlugins {
* @return A list of plugin jar filenames containing the specified test plugins
* @throws AssertionError if any plugin failed to load, or no plugins were loaded.
*/
public static List<Path> pluginPath(TestPlugin... plugins) {
public static Set<Path> pluginPath(TestPlugin... plugins) {
assertAvailable();
return Arrays.stream(plugins)
.filter(Objects::nonNull)
.map(TestPlugin::resourceDir)
.distinct()
.map(PLUGIN_JARS::get)
.collect(Collectors.toList());
.collect(Collectors.toSet());
}
public static String pluginPathJoined(TestPlugin... plugins) {

93
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java

@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigDef.Recommender; @@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigDef.Recommender;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.converters.LongConverter;
@ -33,11 +34,9 @@ import org.apache.kafka.connect.runtime.Herder; @@ -33,11 +34,9 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.SampleSinkConnector;
import org.apache.kafka.connect.runtime.SampleSourceConnector;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@ -61,7 +60,6 @@ import org.junit.Test; @@ -61,7 +60,6 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
import javax.ws.rs.BadRequestException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -109,35 +107,38 @@ public class ConnectorPluginsResourceTest { @@ -109,35 +107,38 @@ public class ConnectorPluginsResourceTest {
private static final ConfigInfos PARTIAL_CONFIG_INFOS;
private static final int ERROR_COUNT = 0;
private static final int PARTIAL_CONFIG_ERROR_COUNT = 1;
private static final Set<MockConnectorPluginDesc<?>> SINK_CONNECTOR_PLUGINS = new TreeSet<>();
private static final Set<MockConnectorPluginDesc<?>> SOURCE_CONNECTOR_PLUGINS = new TreeSet<>();
private static final Set<MockConnectorPluginDesc<?>> CONVERTER_PLUGINS = new TreeSet<>();
private static final Set<MockConnectorPluginDesc<?>> HEADER_CONVERTER_PLUGINS = new TreeSet<>();
private static final Set<MockConnectorPluginDesc<?>> TRANSFORMATION_PLUGINS = new TreeSet<>();
private static final Set<MockConnectorPluginDesc<?>> PREDICATE_PLUGINS = new TreeSet<>();
private static final Set<PluginDesc<?>> SINK_CONNECTOR_PLUGINS = new TreeSet<>();
private static final Set<PluginDesc<?>> SOURCE_CONNECTOR_PLUGINS = new TreeSet<>();
private static final Set<PluginDesc<?>> CONVERTER_PLUGINS = new TreeSet<>();
private static final Set<PluginDesc<?>> HEADER_CONVERTER_PLUGINS = new TreeSet<>();
private static final Set<PluginDesc<?>> TRANSFORMATION_PLUGINS = new TreeSet<>();
private static final Set<PluginDesc<?>> PREDICATE_PLUGINS = new TreeSet<>();
static {
try {
SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSinkConnector.class));
SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSinkConnector.class));
ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader();
String appVersion = AppInfoParser.getVersion();
SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK, classLoader));
SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK, classLoader));
SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSourceConnector.class));
SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSourceConnector.class));
SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(SchemaSourceConnector.class));
SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(ConnectorPluginsResourceTestConnector.class));
SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSourceConnector.class, appVersion, PluginType.SOURCE, classLoader));
SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSourceConnector.class, appVersion, PluginType.SOURCE, classLoader));
SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(SchemaSourceConnector.class, appVersion, PluginType.SOURCE, classLoader));
SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(ConnectorPluginsResourceTestConnector.class, appVersion, PluginType.SOURCE, classLoader));
CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class));
CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class));
CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader));
CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader));
HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class));
HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class));
HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader));
HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader));
TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(RegexRouter.class));
TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(TimestampConverter.Key.class));
TRANSFORMATION_PLUGINS.add(new PluginDesc<>(RegexRouter.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader));
TRANSFORMATION_PLUGINS.add(new PluginDesc<>(TimestampConverter.Key.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader));
PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(HasHeaderKey.class));
PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(RecordIsTombstone.class));
PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader));
PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader));
} catch (Exception e) {
e.printStackTrace();
fail("Failed setting up plugins");
}
}
@ -344,7 +345,7 @@ public class ConnectorPluginsResourceTest { @@ -344,7 +345,7 @@ public class ConnectorPluginsResourceTest {
Set<PluginInfo> expectedConnectorPlugins = Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS)
.flatMap(Collection::stream)
.filter(p -> !excludes.contains(p.pluginClass()))
.map(ConnectorPluginsResourceTest::newInfo)
.map(PluginInfo::new)
.collect(Collectors.toSet());
Set<PluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(true));
assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
@ -353,8 +354,9 @@ public class ConnectorPluginsResourceTest { @@ -353,8 +354,9 @@ public class ConnectorPluginsResourceTest {
@Test
public void testConnectorPluginsIncludesClassTypeAndVersionInformation() throws Exception {
PluginInfo sinkInfo = newInfo(SampleSinkConnector.class);
PluginInfo sourceInfo = newInfo(SampleSourceConnector.class);
ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader();
PluginInfo sinkInfo = new PluginInfo(new PluginDesc<>(SampleSinkConnector.class, SampleSinkConnector.VERSION, PluginType.SINK, classLoader));
PluginInfo sourceInfo = new PluginInfo(new PluginDesc<>(SampleSourceConnector.class, SampleSourceConnector.VERSION, PluginType.SOURCE, classLoader));
assertEquals(PluginType.SINK.toString(), sinkInfo.type());
assertEquals(PluginType.SOURCE.toString(), sourceInfo.type());
assertEquals(SampleSinkConnector.VERSION, sinkInfo.version());
@ -399,7 +401,7 @@ public class ConnectorPluginsResourceTest { @@ -399,7 +401,7 @@ public class ConnectorPluginsResourceTest {
PREDICATE_PLUGINS
).flatMap(Collection::stream)
.filter(p -> !excludes.contains(p.pluginClass()))
.map(ConnectorPluginsResourceTest::newInfo)
.map(PluginInfo::new)
.collect(Collectors.toSet());
Set<PluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
@ -424,41 +426,6 @@ public class ConnectorPluginsResourceTest { @@ -424,41 +426,6 @@ public class ConnectorPluginsResourceTest {
}
}
protected static PluginInfo newInfo(PluginDesc<?> pluginDesc) {
return new PluginInfo(new MockConnectorPluginDesc<>(pluginDesc.pluginClass(), pluginDesc.version()));
}
protected static PluginInfo newInfo(Class<?> klass)
throws Exception {
return new PluginInfo(new MockConnectorPluginDesc<>(klass));
}
public static class MockPluginClassLoader extends PluginClassLoader {
public MockPluginClassLoader(URL pluginLocation, URL[] urls) {
super(pluginLocation, urls);
}
@Override
public String location() {
return "/tmp/mockpath";
}
}
public static class MockConnectorPluginDesc<T> extends PluginDesc<T> {
public MockConnectorPluginDesc(Class<T> klass, String version) {
super(klass, version, new MockPluginClassLoader(null, new URL[0]));
}
public MockConnectorPluginDesc(Class<T> klass) throws Exception {
super(
klass,
ReflectionScanner.versionFor(klass),
new MockPluginClassLoader(null, new URL[0])
);
}
}
/* Name here needs to be unique as we are testing the aliasing mechanism */
public static class ConnectorPluginsResourceTestConnector extends SourceConnector {
@ -476,7 +443,7 @@ public class ConnectorPluginsResourceTest { @@ -476,7 +443,7 @@ public class ConnectorPluginsResourceTest {
@Override
public String version() {
return "1.0";
return AppInfoParser.getVersion();
}
@Override

Loading…
Cancel
Save