Browse Source

KAFKA-15030: Add connect-plugin-path command-line tool (#14064)

Reviewers: Chris Egerton <chrise@aiven.io>
pull/14196/head
Greg Harris 1 year ago committed by GitHub
parent
commit
f5655d31d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      bin/connect-plugin-path.sh
  2. 21
      bin/windows/connect-plugin-path.bat
  3. 5
      build.gradle
  4. 2
      checkstyle/import-control.xml
  5. 3
      config/tools-log4j.properties
  6. 10
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
  7. 8
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
  8. 42
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  9. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
  10. 8
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java
  11. 32
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
  12. 16
      connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
  13. 46
      connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
  14. 57
      connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
  15. 96
      connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
  16. 51
      connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
  17. 61
      connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java
  18. 61
      connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSourceConnector.java
  19. 50
      connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java
  20. 492
      tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
  21. 516
      tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java

21
bin/connect-plugin-path.sh

@ -0,0 +1,21 @@ @@ -0,0 +1,21 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConnectPluginPath "$@"

21
bin/windows/connect-plugin-path.bat

@ -0,0 +1,21 @@ @@ -0,0 +1,21 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xms256M -Xmx2G
)
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConnectPluginPath %*

5
build.gradle

@ -1859,6 +1859,8 @@ project(':tools') { @@ -1859,6 +1859,8 @@ project(':tools') {
dependencies {
implementation project(':clients')
implementation project(':server-common')
implementation project(':connect:api')
implementation project(':connect:runtime')
implementation project(':log4j-appender')
implementation project(':tools:tools-api')
implementation libs.argparse4j
@ -1877,6 +1879,9 @@ project(':tools') { @@ -1877,6 +1879,9 @@ project(':tools') {
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':connect:api')
testImplementation project(':connect:runtime')
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension

2
checkstyle/import-control.xml

@ -308,6 +308,8 @@ @@ -308,6 +308,8 @@
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.connect.runtime" />
<allow pkg="org.apache.kafka.connect.runtime.isolation" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.jose4j" />
<allow pkg="net.sourceforge.argparse4j" />

3
config/tools-log4j.properties

@ -19,3 +19,6 @@ log4j.appender.stderr=org.apache.log4j.ConsoleAppender @@ -19,3 +19,6 @@ log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stderr.Target=System.err
# for connect-plugin-path
log4j.logger.org.reflections=ERROR

10
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java

@ -72,7 +72,7 @@ public abstract class PluginScanner { @@ -72,7 +72,7 @@ public abstract class PluginScanner {
}
private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
log.info("Loading plugin from: {}", source.location());
log.info("Loading plugin from: {}", source);
if (log.isDebugEnabled()) {
log.debug("Loading plugin urls: {}", Arrays.toString(source.urls()));
}
@ -136,13 +136,13 @@ public abstract class PluginScanner { @@ -136,13 +136,13 @@ public abstract class PluginScanner {
pluginImpl = handleLinkageError(type, source, iterator::next);
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {} in {}{}",
type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
type.simpleName(), source, reflectiveErrorDescription(t.getCause()), t);
continue;
}
Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass();
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
type.simpleName(), pluginKlass.getClassLoader(), source.location());
type.simpleName(), pluginKlass.getClassLoader(), source);
continue;
}
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), type, source));
@ -181,14 +181,14 @@ public abstract class PluginScanner { @@ -181,14 +181,14 @@ public abstract class PluginScanner {
|| !Objects.equals(lastError.getClass(), t.getClass())
|| !Objects.equals(lastError.getMessage(), t.getMessage())) {
log.error("Failed to discover {} in {}{}",
type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
type.simpleName(), source, reflectiveErrorDescription(t.getCause()), t);
}
lastError = t;
}
}
log.error("Received excessive ServiceLoader errors: assuming the runtime ServiceLoader implementation cannot " +
"skip faulty implementations. Use a different JRE, or resolve LinkageErrors for plugins in {}",
source.location(), lastError);
source, lastError);
throw lastError;
}

8
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java

@ -18,13 +18,11 @@ package org.apache.kafka.connect.runtime.isolation; @@ -18,13 +18,11 @@ package org.apache.kafka.connect.runtime.isolation;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Objects;
public class PluginSource {
public static final Path CLASSPATH = Paths.get("classpath");
private final Path location;
private final ClassLoader loader;
private final URL[] urls;
@ -48,7 +46,7 @@ public class PluginSource { @@ -48,7 +46,7 @@ public class PluginSource {
}
public boolean isolated() {
return location != CLASSPATH;
return location != null;
}
@Override
@ -65,4 +63,8 @@ public class PluginSource { @@ -65,4 +63,8 @@ public class PluginSource {
result = 31 * result + Arrays.hashCode(urls);
return result;
}
public String toString() {
return location == null ? "classpath" : location.toString();
}
}

42
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java

@ -20,6 +20,7 @@ import org.reflections.util.ClasspathHelper; @@ -20,6 +20,7 @@ import org.reflections.util.ClasspathHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Modifier;
import java.net.MalformedURLException;
@ -197,7 +198,7 @@ public class PluginUtils { @@ -197,7 +198,7 @@ public class PluginUtils {
return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
}
public static Set<Path> pluginLocations(String pluginPath) {
public static Set<Path> pluginLocations(String pluginPath, boolean failFast) {
if (pluginPath == null) {
return Collections.emptySet();
}
@ -206,6 +207,12 @@ public class PluginUtils { @@ -206,6 +207,12 @@ public class PluginUtils {
for (String path : pluginPathElements) {
try {
Path pluginPathElement = Paths.get(path).toAbsolutePath();
if (pluginPath.isEmpty()) {
log.warn("Plugin path element is empty, evaluating to {}.", pluginPathElement);
}
if (!Files.exists(pluginPathElement)) {
throw new FileNotFoundException(pluginPathElement.toString());
}
// Currently 'plugin.paths' property is a list of top-level directories
// containing plugins
if (Files.isDirectory(pluginPathElement)) {
@ -214,6 +221,9 @@ public class PluginUtils { @@ -214,6 +221,9 @@ public class PluginUtils {
pluginLocations.add(pluginPathElement);
}
} catch (InvalidPathException | IOException e) {
if (failFast) {
throw new RuntimeException(e);
}
log.error("Could not get listing for plugin path: {}. Ignoring.", path, e);
}
}
@ -332,8 +342,19 @@ public class PluginUtils { @@ -332,8 +342,19 @@ public class PluginUtils {
public static Set<PluginSource> pluginSources(Set<Path> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) {
Set<PluginSource> pluginSources = new LinkedHashSet<>();
for (Path pluginLocation : pluginLocations) {
try {
pluginSources.add(isolatedPluginSource(pluginLocation, classLoader, factory));
} catch (InvalidPathException | MalformedURLException e) {
log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e);
} catch (IOException e) {
log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
}
}
pluginSources.add(classpathPluginSource(classLoader.getParent()));
return pluginSources;
}
public static PluginSource isolatedPluginSource(Path pluginLocation, ClassLoader parent, PluginClassLoaderFactory factory) throws IOException {
List<URL> pluginUrls = new ArrayList<>();
for (Path path : pluginUrls(pluginLocation)) {
pluginUrls.add(path.toUri().toURL());
@ -342,23 +363,18 @@ public class PluginUtils { @@ -342,23 +363,18 @@ public class PluginUtils {
PluginClassLoader loader = factory.newPluginClassLoader(
pluginLocation.toUri().toURL(),
urls,
classLoader
parent
);
pluginSources.add(new PluginSource(pluginLocation, loader, urls));
} catch (InvalidPathException | MalformedURLException e) {
log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e);
} catch (IOException e) {
log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
}
return new PluginSource(pluginLocation, loader, urls);
}
public static PluginSource classpathPluginSource(ClassLoader classLoader) {
List<URL> parentUrls = new ArrayList<>();
parentUrls.addAll(ClasspathHelper.forJavaClassPath());
parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent()));
pluginSources.add(new PluginSource(PluginSource.CLASSPATH, classLoader.getParent(), parentUrls.toArray(new URL[0])));
return pluginSources;
parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader));
return new PluginSource(null, classLoader, parentUrls.toArray(new URL[0]));
}
/**
* Return the simple class name of a plugin as {@code String}.
*

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java

@ -68,7 +68,7 @@ public class Plugins { @@ -68,7 +68,7 @@ public class Plugins {
Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) {
String pluginPath = WorkerConfig.pluginPath(props);
PluginDiscoveryMode discoveryMode = WorkerConfig.pluginDiscovery(props);
Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath, false);
delegatingLoader = factory.newDelegatingClassLoader(parent);
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
scanResult = initLoaders(pluginSources, discoveryMode);

8
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java

@ -117,26 +117,26 @@ public class ReflectionScanner extends PluginScanner { @@ -117,26 +117,26 @@ public class ReflectionScanner extends PluginScanner {
plugins = reflections.getSubTypesOf((Class<T>) type.superClass());
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any {} in {} for URLs: {}",
type, source.location(), source.urls(), e);
type, source, source.urls(), e);
return Collections.emptySortedSet();
}
SortedSet<PluginDesc<T>> result = new TreeSet<>();
for (Class<? extends T> pluginKlass : plugins) {
if (!PluginUtils.isConcrete(pluginKlass)) {
log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source.location());
log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source);
continue;
}
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass, pluginKlass.getClassLoader(), source.location());
pluginKlass, pluginKlass.getClassLoader(), source);
continue;
}
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), type, source));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {} in {}: Unable to instantiate {}{}",
type.simpleName(), source.location(), pluginKlass.getSimpleName(),
type.simpleName(), source, pluginKlass.getSimpleName(),
reflectiveErrorDescription(e), e);
}
}

32
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java

@ -125,7 +125,7 @@ public class TestPlugins { @@ -125,7 +125,7 @@ public class TestPlugins {
/**
* A plugin which is incorrectly packaged, and is missing a superclass definition.
*/
BAD_PACKAGING_MISSING_SUPERCLASS("bad-packaging", "test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER),
BAD_PACKAGING_MISSING_SUPERCLASS("bad-packaging", "test.plugins.MissingSuperclassConverter", false, REMOVE_CLASS_FILTER),
/**
* A plugin which is packaged with other incorrectly packaged plugins, but itself has no issues loading.
*/
@ -173,7 +173,35 @@ public class TestPlugins { @@ -173,7 +173,35 @@ public class TestPlugins {
/**
* A ServiceLoader discovered plugin which subclasses another plugin which is present on the classpath
*/
SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY("subclass-of-classpath", "test.plugins.SubclassOfClasspathOverridePolicy");
SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY("subclass-of-classpath", "test.plugins.SubclassOfClasspathOverridePolicy"),
/**
* A converter which does not have a corresponding ServiceLoader manifest
*/
NON_MIGRATED_CONVERTER("non-migrated", "test.plugins.NonMigratedConverter", false),
/**
* A header converter which does not have a corresponding ServiceLoader manifest
*/
NON_MIGRATED_HEADER_CONVERTER("non-migrated", "test.plugins.NonMigratedHeaderConverter", false),
/**
* A plugin which implements multiple interfaces, and has ServiceLoader manifests for some interfaces and not others.
*/
NON_MIGRATED_MULTI_PLUGIN("non-migrated", "test.plugins.NonMigratedMultiPlugin", false),
/**
* A predicate which does not have a corresponding ServiceLoader manifest
*/
NON_MIGRATED_PREDICATE("non-migrated", "test.plugins.NonMigratedPredicate", false),
/**
* A sink connector which does not have a corresponding ServiceLoader manifest
*/
NON_MIGRATED_SINK_CONNECTOR("non-migrated", "test.plugins.NonMigratedSinkConnector", false),
/**
* A source connector which does not have a corresponding ServiceLoader manifest
*/
NON_MIGRATED_SOURCE_CONNECTOR("non-migrated", "test.plugins.NonMigratedSourceConnector", false),
/**
* A transformation which does not have a corresponding ServiceLoader manifest
*/
NON_MIGRATED_TRANSFORMATION("non-migrated", "test.plugins.NonMigratedTransformation", false);
private final String resourceDir;
private final String className;

16
connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.NonMigratedMultiPlugin

46
connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java

@ -0,0 +1,46 @@ @@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import java.util.Map;
/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Class which is not migrated to include a service loader manifest.
*/
public final class NonMigratedConverter implements Converter {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
}
@Override
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
return null;
}
}

57
connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java

@ -0,0 +1,57 @@ @@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.HeaderConverter;
import java.io.IOException;
import java.util.Map;
/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Class which is not migrated to include a service loader manifest.
*/
public class NonMigratedHeaderConverter implements HeaderConverter {
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return null;
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return new byte[0];
}
@Override
public ConfigDef config() {
return null;
}
@Override
public void close() throws IOException {
}
@Override
public void configure(Map<String, ?> configs) {
}
}

96
connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java

@ -0,0 +1,96 @@ @@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.List;
import java.util.Map;
/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Class which is not migrated to include a service loader manifest.
*/
public final class NonMigratedMultiPlugin implements Converter, HeaderConverter, Predicate, Transformation, ConnectorClientConfigOverridePolicy {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return null;
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return new byte[0];
}
@Override
public ConnectRecord apply(ConnectRecord record) {
return null;
}
@Override
public ConfigDef config() {
return null;
}
@Override
public boolean test(ConnectRecord record) {
return false;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public List<ConfigValue> validate(ConnectorClientConfigRequest connectorClientConfigRequest) {
return null;
}
}

51
connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java

@ -0,0 +1,51 @@ @@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Map;
/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Class which is not migrated to include a service loader manifest.
*/
public class NonMigratedPredicate implements Predicate {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ConfigDef config() {
return null;
}
@Override
public boolean test(ConnectRecord record) {
return false;
}
@Override
public void close() {
}
}

61
connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java

@ -0,0 +1,61 @@ @@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import java.util.List;
import java.util.Map;
/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Class which is not migrated to include a service loader manifest.
*/
public class NonMigratedSinkConnector extends SinkConnector {
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
@Override
public String version() {
return "1.0.0";
}
}

61
connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSourceConnector.java

@ -0,0 +1,61 @@ @@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.List;
import java.util.Map;
/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Class which is not migrated to include a service loader manifest.
*/
public class NonMigratedSourceConnector extends SourceConnector {
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
@Override
public String version() {
return "1.0.0";
}
}

50
connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java

@ -0,0 +1,50 @@ @@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.Map;
/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Class which is not migrated to include a service loader manifest.
*/
public class NonMigratedTransformation implements Transformation {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ConnectRecord apply(ConnectRecord record) {
return null;
}
@Override
public ConfigDef config() {
return null;
}
@Override
public void close() {
}
}

492
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java

@ -0,0 +1,492 @@ @@ -0,0 +1,492 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginScanResult;
import org.apache.kafka.connect.runtime.isolation.PluginSource;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ConnectPluginPath {
private static final String MANIFEST_PREFIX = "META-INF/services/";
public static final Object[] LIST_TABLE_COLUMNS = {
"pluginName",
"firstAlias",
"secondAlias",
"pluginVersion",
"pluginType",
"isLoadable",
"hasManifest",
"pluginLocation" // last because it is least important and most repetitive
};
public static final String NO_ALIAS = "N/A";
public static void main(String[] args) {
Exit.exit(mainNoExit(args, System.out, System.err));
}
public static int mainNoExit(String[] args, PrintStream out, PrintStream err) {
ArgumentParser parser = parser();
try {
Namespace namespace = parser.parseArgs(args);
Config config = parseConfig(parser, namespace, out);
runCommand(config);
return 0;
} catch (ArgumentParserException e) {
parser.handleError(e);
return 1;
} catch (TerseException e) {
err.println(e.getMessage());
return 2;
} catch (Throwable e) {
err.println(e.getMessage());
err.println(Utils.stackTrace(e));
return 3;
}
}
private static ArgumentParser parser() {
ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-plugin-path")
.defaultHelp(true)
.description("Manage plugins on the Connect plugin.path");
ArgumentParser listCommand = parser.addSubparsers()
.description("List information about plugins contained within the specified plugin locations")
.dest("subcommand")
.addParser("list");
ArgumentParser[] subparsers = new ArgumentParser[] {
listCommand,
};
for (ArgumentParser subparser : subparsers) {
ArgumentGroup pluginProviders = subparser.addArgumentGroup("plugin providers");
pluginProviders.addArgument("--plugin-location")
.setDefault(new ArrayList<>())
.action(Arguments.append())
.help("A single plugin location (jar file or directory)");
pluginProviders.addArgument("--plugin-path")
.setDefault(new ArrayList<>())
.action(Arguments.append())
.help("A comma-delimited list of locations containing plugins");
pluginProviders.addArgument("--worker-config")
.setDefault(new ArrayList<>())
.action(Arguments.append())
.help("A Connect worker configuration file");
}
return parser;
}
private static Config parseConfig(ArgumentParser parser, Namespace namespace, PrintStream out) throws ArgumentParserException, TerseException {
Set<Path> locations = parseLocations(parser, namespace);
String subcommand = namespace.getString("subcommand");
if (subcommand == null) {
throw new ArgumentParserException("No subcommand specified", parser);
}
switch (subcommand) {
case "list":
return new Config(Command.LIST, locations, out);
default:
throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser);
}
}
private static Set<Path> parseLocations(ArgumentParser parser, Namespace namespace) throws ArgumentParserException, TerseException {
List<String> rawLocations = new ArrayList<>(namespace.getList("plugin_location"));
List<String> rawPluginPaths = new ArrayList<>(namespace.getList("plugin_path"));
List<String> rawWorkerConfigs = new ArrayList<>(namespace.getList("worker_config"));
if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() && rawWorkerConfigs.isEmpty()) {
throw new ArgumentParserException("Must specify at least one --plugin-location, --plugin-path, or --worker-config", parser);
}
Set<Path> pluginLocations = new LinkedHashSet<>();
for (String rawWorkerConfig : rawWorkerConfigs) {
Properties properties;
try {
properties = Utils.loadProps(rawWorkerConfig);
} catch (IOException e) {
throw new TerseException("Unable to read worker config at " + rawWorkerConfig);
}
String pluginPath = properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG);
if (pluginPath != null) {
rawPluginPaths.add(pluginPath);
}
}
for (String rawPluginPath : rawPluginPaths) {
try {
pluginLocations.addAll(PluginUtils.pluginLocations(rawPluginPath, true));
} catch (UncheckedIOException e) {
throw new TerseException("Unable to parse plugin path " + rawPluginPath + ": " + e.getMessage());
}
}
for (String rawLocation : rawLocations) {
Path pluginLocation = Paths.get(rawLocation);
if (!pluginLocation.toFile().exists()) {
throw new TerseException("Specified location " + pluginLocation + " does not exist");
}
pluginLocations.add(pluginLocation);
}
return pluginLocations;
}
enum Command {
LIST
}
private static class Config {
private final Command command;
private final Set<Path> locations;
private final PrintStream out;
private Config(Command command, Set<Path> locations, PrintStream out) {
this.command = command;
this.locations = locations;
this.out = out;
}
@Override
public String toString() {
return "Config{" +
"command=" + command +
", locations=" + locations +
'}';
}
}
public static void runCommand(Config config) throws TerseException {
try {
ClassLoader parent = ConnectPluginPath.class.getClassLoader();
ServiceLoaderScanner serviceLoaderScanner = new ServiceLoaderScanner();
ReflectionScanner reflectionScanner = new ReflectionScanner();
// Process the contents of the classpath to exclude it from later results.
PluginSource classpathSource = PluginUtils.classpathPluginSource(parent);
Map<String, List<ManifestEntry>> classpathManifests = findManifests(classpathSource, Collections.emptyMap());
PluginScanResult classpathPlugins = discoverPlugins(classpathSource, reflectionScanner, serviceLoaderScanner);
Map<Path, Set<Row>> rowsByLocation = new LinkedHashMap<>();
Set<Row> classpathRows = enumerateRows(null, classpathManifests, classpathPlugins);
rowsByLocation.put(null, classpathRows);
ClassLoaderFactory factory = new ClassLoaderFactory();
try (DelegatingClassLoader delegatingClassLoader = factory.newDelegatingClassLoader(parent)) {
beginCommand(config);
for (Path pluginLocation : config.locations) {
PluginSource source = PluginUtils.isolatedPluginSource(pluginLocation, delegatingClassLoader, factory);
Map<String, List<ManifestEntry>> manifests = findManifests(source, classpathManifests);
PluginScanResult plugins = discoverPlugins(source, reflectionScanner, serviceLoaderScanner);
Set<Row> rows = enumerateRows(pluginLocation, manifests, plugins);
rowsByLocation.put(pluginLocation, rows);
for (Row row : rows) {
handlePlugin(config, row);
}
}
endCommand(config, rowsByLocation);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/**
* The unit of work for a command.
* <p>This is unique to the (source, class, type) tuple, and contains additional pre-computed information
* that pertains to this specific plugin.
*/
private static class Row {
private final Path pluginLocation;
private final String className;
private final PluginType type;
private final String version;
private final List<String> aliases;
private final boolean loadable;
private final boolean hasManifest;
public Row(Path pluginLocation, String className, PluginType type, String version, List<String> aliases, boolean loadable, boolean hasManifest) {
this.pluginLocation = pluginLocation;
this.className = Objects.requireNonNull(className, "className must be non-null");
this.version = Objects.requireNonNull(version, "version must be non-null");
this.type = Objects.requireNonNull(type, "type must be non-null");
this.aliases = Objects.requireNonNull(aliases, "aliases must be non-null");
this.loadable = loadable;
this.hasManifest = hasManifest;
}
private boolean loadable() {
return loadable;
}
private boolean compatible() {
return loadable && hasManifest;
}
private boolean incompatible() {
return !compatible();
}
private String locationString() {
return pluginLocation == null ? "classpath" : pluginLocation.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Row row = (Row) o;
return Objects.equals(pluginLocation, row.pluginLocation) && className.equals(row.className) && type == row.type;
}
@Override
public int hashCode() {
return Objects.hash(pluginLocation, className, type);
}
}
private static Set<Row> enumerateRows(Path pluginLocation, Map<String, List<ManifestEntry>> manifests, PluginScanResult scanResult) {
Set<Row> rows = new HashSet<>();
// Perform a deep copy of the manifests because we're going to be mutating our copy.
Map<String, Set<ManifestEntry>> unloadablePlugins = manifests.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue())));
scanResult.forEach(pluginDesc -> {
// Emit a loadable row for this scan result, since it was found during plugin discovery
Set<String> rowAliases = new LinkedHashSet<>();
rowAliases.add(PluginUtils.simpleName(pluginDesc));
rowAliases.add(PluginUtils.prunedName(pluginDesc));
rows.add(newRow(pluginLocation, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true, manifests));
// Remove the ManifestEntry if it has the same className and type as one of the loadable plugins.
unloadablePlugins.getOrDefault(pluginDesc.className(), Collections.emptySet()).removeIf(entry -> entry.type == pluginDesc.type());
});
unloadablePlugins.values().forEach(entries -> entries.forEach(entry -> {
// Emit a non-loadable row, since all the loadable rows showed up in the previous iteration.
// Two ManifestEntries may produce the same row if they have different URIs
rows.add(newRow(pluginLocation, entry.className, Collections.emptyList(), entry.type, PluginDesc.UNDEFINED_VERSION, false, manifests));
}));
return rows;
}
private static Row newRow(Path pluginLocation, String className, List<String> rowAliases, PluginType type, String version, boolean loadable, Map<String, List<ManifestEntry>> manifests) {
boolean hasManifest = manifests.containsKey(className) && manifests.get(className).stream().anyMatch(e -> e.type == type);
return new Row(pluginLocation, className, type, version, rowAliases, loadable, hasManifest);
}
private static void beginCommand(Config config) {
if (config.command == Command.LIST) {
// The list command prints a TSV-formatted table with details of the found plugins
// This is officially human-readable output with no guarantees for backwards-compatibility
// It should be reasonably easy to parse for ad-hoc scripting use-cases.
listTablePrint(config, LIST_TABLE_COLUMNS);
}
}
private static void handlePlugin(Config config, Row row) {
if (config.command == Command.LIST) {
String firstAlias = row.aliases.size() > 0 ? row.aliases.get(0) : NO_ALIAS;
String secondAlias = row.aliases.size() > 1 ? row.aliases.get(1) : NO_ALIAS;
listTablePrint(config,
row.className,
firstAlias,
secondAlias,
row.version,
row.type,
row.loadable,
row.hasManifest,
// last because it is least important and most repetitive
row.locationString()
);
}
}
private static void endCommand(
Config config,
Map<Path, Set<Row>> rowsByLocation
) {
if (config.command == Command.LIST) {
// end the table with an empty line to enable users to separate the table from the summary.
config.out.println();
rowsByLocation.remove(null);
Set<Row> isolatedRows = rowsByLocation.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
long totalPlugins = isolatedRows.size();
long loadablePlugins = isolatedRows.stream().filter(Row::loadable).count();
long compatiblePlugins = isolatedRows.stream().filter(Row::compatible).count();
config.out.printf("Total plugins: \t%d%n", totalPlugins);
config.out.printf("Loadable plugins: \t%d%n", loadablePlugins);
config.out.printf("Compatible plugins: \t%d%n", compatiblePlugins);
}
}
private static void listTablePrint(Config config, Object... args) {
if (ConnectPluginPath.LIST_TABLE_COLUMNS.length != args.length) {
throw new IllegalArgumentException("Table must have exactly " + ConnectPluginPath.LIST_TABLE_COLUMNS.length + " columns");
}
config.out.println(Stream.of(args)
.map(Objects::toString)
.collect(Collectors.joining("\t")));
}
private static PluginScanResult discoverPlugins(PluginSource source, ReflectionScanner reflectionScanner, ServiceLoaderScanner serviceLoaderScanner) {
PluginScanResult serviceLoadResult = serviceLoaderScanner.discoverPlugins(Collections.singleton(source));
PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Collections.singleton(source));
return new PluginScanResult(Arrays.asList(serviceLoadResult, reflectiveResult));
}
private static class ManifestEntry {
private final URI manifestURI;
private final String className;
private final PluginType type;
private ManifestEntry(URI manifestURI, String className, PluginType type) {
this.manifestURI = manifestURI;
this.className = className;
this.type = type;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ManifestEntry that = (ManifestEntry) o;
return manifestURI.equals(that.manifestURI) && className.equals(that.className) && type == that.type;
}
@Override
public int hashCode() {
return Objects.hash(manifestURI, className, type);
}
}
private static Map<String, List<ManifestEntry>> findManifests(PluginSource source, Map<String, List<ManifestEntry>> exclude) {
Map<String, List<ManifestEntry>> manifests = new LinkedHashMap<>();
for (PluginType type : PluginType.values()) {
try {
Enumeration<URL> resources = source.loader().getResources(MANIFEST_PREFIX + type.superClass().getName());
while (resources.hasMoreElements()) {
URL url = resources.nextElement();
for (String className : parse(url)) {
ManifestEntry e = new ManifestEntry(url.toURI(), className, type);
manifests.computeIfAbsent(className, ignored -> new ArrayList<>()).add(e);
}
}
} catch (URISyntaxException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
for (Map.Entry<String, List<ManifestEntry>> entry : exclude.entrySet()) {
String className = entry.getKey();
List<ManifestEntry> excluded = entry.getValue();
// Note this must be a remove and not removeAll, because we want to remove only one copy at a time.
// If the same jar is present on the classpath and plugin path, then manifests will contain 2 identical
// ManifestEntry instances, with a third copy in the excludes. After the excludes are processed,
// manifests should contain exactly one copy of the ManifestEntry.
for (ManifestEntry e : excluded) {
manifests.getOrDefault(className, Collections.emptyList()).remove(e);
}
}
return manifests;
}
// Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11
private static Set<String> parse(URL u) {
Set<String> names = new LinkedHashSet<>(); // preserve insertion order
try {
URLConnection uc = u.openConnection();
uc.setUseCaches(false);
try (InputStream in = uc.getInputStream();
BufferedReader r
= new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
int lc = 1;
while ((lc = parseLine(u, r, lc, names)) >= 0) {
// pass
}
}
} catch (IOException x) {
throw new RuntimeException("Error accessing configuration file", x);
}
return names;
}
// Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11
private static int parseLine(URL u, BufferedReader r, int lc, Set<String> names) throws IOException {
String ln = r.readLine();
if (ln == null) {
return -1;
}
int ci = ln.indexOf('#');
if (ci >= 0) ln = ln.substring(0, ci);
ln = ln.trim();
int n = ln.length();
if (n != 0) {
if ((ln.indexOf(' ') >= 0) || (ln.indexOf('\t') >= 0))
throw new IOException("Illegal configuration-file syntax in " + u);
int cp = ln.codePointAt(0);
if (!Character.isJavaIdentifierStart(cp))
throw new IOException("Illegal provider-class name: " + ln + " in " + u);
int start = Character.charCount(cp);
for (int i = start; i < n; i += Character.charCount(cp)) {
cp = ln.codePointAt(i);
if (!Character.isJavaIdentifierPart(cp) && (cp != '.'))
throw new IOException("Illegal provider-class name: " + ln + " in " + u);
}
names.add(ln);
}
return lc + 1;
}
}

516
tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java

@ -0,0 +1,516 @@ @@ -0,0 +1,516 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginScanResult;
import org.apache.kafka.connect.runtime.isolation.PluginSource;
import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConnectPluginPathTest {
private static final Logger log = LoggerFactory.getLogger(ConnectPluginPathTest.class);
private static final int NAME_COL = 0;
private static final int ALIAS1_COL = 1;
private static final int ALIAS2_COL = 2;
private static final int VERSION_COL = 3;
private static final int TYPE_COL = 4;
private static final int LOADABLE_COL = 5;
private static final int MANIFEST_COL = 6;
private static final int LOCATION_COL = 7;
@TempDir
public Path workspace;
@BeforeAll
public static void setUp() {
// Work around a circular-dependency in TestPlugins.
TestPlugins.pluginPath();
}
@Test
public void testNoArguments() {
CommandResult res = runCommand();
assertNotEquals(0, res.returnCode);
}
@Test
public void testListNoArguments() {
CommandResult res = runCommand(
"list"
);
assertNotEquals(0, res.returnCode);
}
@ParameterizedTest
@EnumSource
public void testListOneLocation(PluginLocationType type) {
CommandResult res = runCommand(
"list",
"--plugin-location",
setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN)
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertNonMigratedPluginsPresent(table);
}
@ParameterizedTest
@EnumSource
public void testListMultipleLocations(PluginLocationType type) {
CommandResult res = runCommand(
"list",
"--plugin-location",
setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN),
"--plugin-location",
setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE)
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertNonMigratedPluginsPresent(table);
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE);
}
@ParameterizedTest
@EnumSource
public void testListOnePluginPath(PluginLocationType type) {
CommandResult res = runCommand(
"list",
"--plugin-path",
setupPluginPathElement(workspace.resolve("path-a"), type,
TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE)
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE);
}
@ParameterizedTest
@EnumSource
public void testListMultiplePluginPaths(PluginLocationType type) {
CommandResult res = runCommand(
"list",
"--plugin-path",
setupPluginPathElement(workspace.resolve("path-a"), type,
TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE),
"--plugin-path",
setupPluginPathElement(workspace.resolve("path-b"), type,
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER, TestPlugins.TestPlugin.ALIASED_STATIC_FIELD)
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE,
TestPlugins.TestPlugin.ALIASED_STATIC_FIELD);
}
@ParameterizedTest
@EnumSource
public void testListOneWorkerConfig(PluginLocationType type) {
CommandResult res = runCommand(
"list",
"--worker-config",
setupWorkerConfig(workspace.resolve("worker.properties"),
setupPluginPathElement(workspace.resolve("path-a"), type,
TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED))
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertBadPackagingPluginsPresent(table);
}
@ParameterizedTest
@EnumSource
public void testListMultipleWorkerConfigs(PluginLocationType type) {
CommandResult res = runCommand(
"list",
"--worker-config",
setupWorkerConfig(workspace.resolve("worker-a.properties"),
setupPluginPathElement(workspace.resolve("path-a"), type,
TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN)),
"--worker-config",
setupWorkerConfig(workspace.resolve("worker-b.properties"),
setupPluginPathElement(workspace.resolve("path-b"), type,
TestPlugins.TestPlugin.SERVICE_LOADER))
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertNonMigratedPluginsPresent(table);
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.SERVICE_LOADER);
}
private static Map<String, List<String[]>> assertListSuccess(CommandResult result) {
assertEquals(0, result.returnCode);
Map<String, List<String[]>> table = parseTable(result.out);
assertIsolatedPluginsInOutput(result.reflective, table);
return table;
}
private static void assertPluginsAreCompatible(Map<String, List<String[]>> table, TestPlugins.TestPlugin... plugins) {
assertPluginMigrationStatus(table, true, true, plugins);
}
private static void assertNonMigratedPluginsPresent(Map<String, List<String[]>> table) {
assertPluginMigrationStatus(table, true, false,
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
TestPlugins.TestPlugin.NON_MIGRATED_HEADER_CONVERTER,
TestPlugins.TestPlugin.NON_MIGRATED_PREDICATE,
TestPlugins.TestPlugin.NON_MIGRATED_SINK_CONNECTOR,
TestPlugins.TestPlugin.NON_MIGRATED_SOURCE_CONNECTOR,
TestPlugins.TestPlugin.NON_MIGRATED_TRANSFORMATION);
// This plugin is partially compatible
assertPluginMigrationStatus(table, true, null,
TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN);
}
private static void assertBadPackagingPluginsPresent(Map<String, List<String[]>> table) {
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED,
TestPlugins.TestPlugin.BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR);
assertPluginMigrationStatus(table, false, true,
TestPlugins.TestPlugin.BAD_PACKAGING_MISSING_SUPERCLASS,
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_CONNECTOR,
TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR,
TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_PRIVATE_CONNECTOR,
TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONNECTOR,
TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONVERTER,
TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_OVERRIDE_POLICY,
TestPlugins.TestPlugin.BAD_PACKAGING_INNER_CLASS_CONNECTOR,
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION);
}
private static void assertIsolatedPluginsInOutput(PluginScanResult reflectiveResult, Map<String, List<String[]>> table) {
reflectiveResult.forEach(pluginDesc -> {
if (pluginDesc.location().equals("classpath")) {
// Classpath plugins do not appear in list output
return;
}
assertTrue(table.containsKey(pluginDesc.className()), "Plugin " + pluginDesc.className() + " does not appear in list output");
boolean foundType = false;
for (String[] row : table.get(pluginDesc.className())) {
if (row[TYPE_COL].equals(pluginDesc.typeName())) {
foundType = true;
assertTrue(row[ALIAS1_COL].equals(ConnectPluginPath.NO_ALIAS) || row[ALIAS1_COL].equals(PluginUtils.simpleName(pluginDesc)));
assertTrue(row[ALIAS2_COL].equals(ConnectPluginPath.NO_ALIAS) || row[ALIAS2_COL].equals(PluginUtils.prunedName(pluginDesc)));
assertEquals(pluginDesc.version(), row[VERSION_COL]);
try {
Path pluginLocation = Paths.get(row[LOCATION_COL]);
// This transforms the raw path `/path/to/somewhere` to the url `file:/path/to/somewhere`
String pluginLocationUrl = pluginLocation.toUri().toURL().toString();
assertEquals(pluginDesc.location(), pluginLocationUrl);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
}
assertTrue(foundType, "Plugin " + pluginDesc.className() + " does not have row for " + pluginDesc.typeName());
});
}
private static void assertPluginMigrationStatus(Map<String, List<String[]>> table, Boolean loadable, Boolean compatible, TestPlugins.TestPlugin... plugins) {
for (TestPlugins.TestPlugin plugin : plugins) {
assertTrue(table.containsKey(plugin.className()), "Plugin " + plugin.className() + " does not appear in list output");
for (String[] row : table.get(plugin.className())) {
log.info("row" + Arrays.toString(row));
if (loadable != null) {
assertEquals(loadable, Boolean.parseBoolean(row[LOADABLE_COL]), "Plugin loadable column for " + plugin.className() + " incorrect");
}
if (compatible != null) {
assertEquals(compatible, Boolean.parseBoolean(row[MANIFEST_COL]), "Plugin hasManifest column for " + plugin.className() + " incorrect");
}
}
}
}
private enum PluginLocationType {
CLASS_HIERARCHY,
SINGLE_JAR,
MULTI_JAR
}
private static class PluginLocation {
private final Path path;
private PluginLocation(Path path) {
this.path = path;
}
@Override
public String toString() {
return path.toString();
}
}
/**
* Populate a writable disk path to be usable as a single plugin location.
* The returned path will be usable as a single path.
* @param path A non-existent path immediately within a writable directory, suggesting a location for this plugin.
* @param type The format to which the on-disk plugin should conform
* @param plugin The plugin which should be written to the specified path
* @return The final usable path name to this location, in case it is different from the suggested input path.
*/
private static PluginLocation setupLocation(Path path, PluginLocationType type, TestPlugins.TestPlugin plugin) {
try {
Path jarPath = TestPlugins.pluginPath(plugin).stream().findFirst().get();
switch (type) {
case CLASS_HIERARCHY: {
try (JarFile jarFile = new JarFile(jarPath.toFile())) {
jarFile.stream().forEach(jarEntry -> {
Path entryPath = path.resolve(jarEntry.getName());
try {
entryPath.getParent().toFile().mkdirs();
Files.copy(jarFile.getInputStream(jarEntry), entryPath, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
return new PluginLocation(path);
}
case SINGLE_JAR: {
Path outputJar = path.resolveSibling(path.getFileName() + ".jar");
outputJar.getParent().toFile().mkdirs();
Files.copy(jarPath, outputJar, StandardCopyOption.REPLACE_EXISTING);
return new PluginLocation(outputJar);
}
case MULTI_JAR: {
Path outputJar = path.resolve(jarPath.getFileName());
outputJar.getParent().toFile().mkdirs();
Files.copy(jarPath, outputJar, StandardCopyOption.REPLACE_EXISTING);
return new PluginLocation(path);
}
default:
throw new IllegalArgumentException("Unknown PluginLocationType");
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static class PluginPathElement {
private final Path root;
private final List<PluginLocation> locations;
private PluginPathElement(Path root, List<PluginLocation> locations) {
this.root = root;
this.locations = locations;
}
@Override
public String toString() {
return root.toString();
}
}
/**
* Populate a writable disk path to be usable as single {@code plugin.path} element providing the specified plugins
* @param path A directory that should contain the populated plugins, will be created if it does not exist.
* @param type The format to which the on-disk plugins should conform
* @param plugins The plugins which should be written to the specified path
* @return The specific inner locations of the plugins that were written.
*/
private PluginPathElement setupPluginPathElement(Path path, PluginLocationType type, TestPlugins.TestPlugin... plugins) {
List<PluginLocation> locations = new ArrayList<>();
for (int i = 0; i < plugins.length; i++) {
TestPlugins.TestPlugin plugin = plugins[i];
locations.add(setupLocation(path.resolve("plugin-" + i), type, plugin));
}
return new PluginPathElement(path, locations);
}
private static class WorkerConfig {
private final Path configFile;
private final List<PluginPathElement> pluginPathElements;
private WorkerConfig(Path configFile, List<PluginPathElement> pluginPathElements) {
this.configFile = configFile;
this.pluginPathElements = pluginPathElements;
}
@Override
public String toString() {
return configFile.toString();
}
}
/**
* Populate a writable disk path
* @param path
* @param pluginPathElements
* @return
*/
private static WorkerConfig setupWorkerConfig(Path path, PluginPathElement... pluginPathElements) {
path.getParent().toFile().mkdirs();
Properties properties = new Properties();
String pluginPath = Arrays.stream(pluginPathElements)
.map(Object::toString)
.collect(Collectors.joining(", "));
properties.setProperty("plugin.path", pluginPath);
try (OutputStream outputStream = Files.newOutputStream(path)) {
properties.store(outputStream, "dummy worker properties file");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return new WorkerConfig(path, Arrays.asList(pluginPathElements));
}
private static class CommandResult {
public CommandResult(int returnCode, String out, String err, PluginScanResult reflective, PluginScanResult serviceLoading) {
this.returnCode = returnCode;
this.out = out;
this.err = err;
this.reflective = reflective;
this.serviceLoading = serviceLoading;
}
int returnCode;
String out;
String err;
PluginScanResult reflective;
PluginScanResult serviceLoading;
}
private static CommandResult runCommand(Object... args) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream err = new ByteArrayOutputStream();
try {
int returnCode = ConnectPluginPath.mainNoExit(
Arrays.stream(args)
.map(Object::toString)
.collect(Collectors.toList())
.toArray(new String[]{}),
new PrintStream(out, true, "utf-8"),
new PrintStream(err, true, "utf-8"));
Set<Path> pluginLocations = getPluginLocations(args);
ClassLoader parent = ConnectPluginPath.class.getClassLoader();
ClassLoaderFactory factory = new ClassLoaderFactory();
try (DelegatingClassLoader delegatingClassLoader = factory.newDelegatingClassLoader(parent)) {
Set<PluginSource> sources = PluginUtils.pluginSources(pluginLocations, delegatingClassLoader, factory);
String stdout = new String(out.toByteArray(), StandardCharsets.UTF_8);
String stderr = new String(err.toByteArray(), StandardCharsets.UTF_8);
log.info("STDOUT:\n{}", stdout);
log.info("STDERR:\n{}", stderr);
return new CommandResult(
returnCode,
stdout,
stderr,
new ReflectionScanner().discoverPlugins(sources),
new ServiceLoaderScanner().discoverPlugins(sources)
);
} catch (IOException e) {
throw new RuntimeException(e);
}
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
private static Set<Path> getPluginLocations(Object[] args) {
return Arrays.stream(args)
.flatMap(obj -> {
if (obj instanceof WorkerConfig) {
return ((WorkerConfig) obj).pluginPathElements.stream();
} else {
return Stream.of(obj);
}
})
.flatMap(obj -> {
if (obj instanceof PluginPathElement) {
return ((PluginPathElement) obj).locations.stream();
} else {
return Stream.of(obj);
}
})
.map(obj -> {
if (obj instanceof PluginLocation) {
return ((PluginLocation) obj).path;
} else {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
}
/**
* Parse the main table of the list command.
* <p>Map is keyed on the plugin name, with a list of rows which referred to that name if there are multiple.
* Each row is pre-split into columns.
* @param listOutput An executed list command
* @return A parsed form of the table grouped by plugin class names
*/
private static Map<String, List<String[]>> parseTable(String listOutput) {
// Split on the empty line which should appear in the output.
String[] sections = listOutput.split("\n\\s*\n");
assertTrue(sections.length > 1, "No empty line in list output");
String[] rows = sections[0].split("\n");
Map<String, List<String[]>> table = new HashMap<>();
// Assert that the first row is the header
assertArrayEquals(ConnectPluginPath.LIST_TABLE_COLUMNS, rows[0].split("\t"), "Table header doesn't have the right columns");
// Skip the header to parse the rows in the table.
for (int i = 1; i < rows.length; i++) {
// group rows by
String[] row = rows[i].split("\t");
assertEquals(ConnectPluginPath.LIST_TABLE_COLUMNS.length, row.length, "Table row is the wrong length");
table.computeIfAbsent(row[NAME_COL], ignored -> new ArrayList<>()).add(row);
}
return table;
}
}
Loading…
Cancel
Save