From f5655d31d3d527dae057240570162827c6a79fb2 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Fri, 11 Aug 2023 12:05:51 -0700 Subject: [PATCH] KAFKA-15030: Add connect-plugin-path command-line tool (#14064) Reviewers: Chris Egerton --- bin/connect-plugin-path.sh | 21 + bin/windows/connect-plugin-path.bat | 21 + build.gradle | 5 + checkstyle/import-control.xml | 2 + config/tools-log4j.properties | 3 + .../runtime/isolation/PluginScanner.java | 10 +- .../runtime/isolation/PluginSource.java | 8 +- .../runtime/isolation/PluginUtils.java | 50 +- .../connect/runtime/isolation/Plugins.java | 2 +- .../runtime/isolation/ReflectionScanner.java | 8 +- .../runtime/isolation/TestPlugins.java | 32 +- ...policy.ConnectorClientConfigOverridePolicy | 16 + .../test/plugins/NonMigratedConverter.java | 46 ++ .../plugins/NonMigratedHeaderConverter.java | 57 ++ .../test/plugins/NonMigratedMultiPlugin.java | 96 ++++ .../test/plugins/NonMigratedPredicate.java | 51 ++ .../plugins/NonMigratedSinkConnector.java | 61 +++ .../plugins/NonMigratedSourceConnector.java | 61 +++ .../plugins/NonMigratedTransformation.java | 50 ++ .../apache/kafka/tools/ConnectPluginPath.java | 492 +++++++++++++++++ .../kafka/tools/ConnectPluginPathTest.java | 516 ++++++++++++++++++ 21 files changed, 1576 insertions(+), 32 deletions(-) create mode 100755 bin/connect-plugin-path.sh create mode 100644 bin/windows/connect-plugin-path.bat create mode 100644 connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy create mode 100644 connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java create mode 100644 connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java create mode 100644 connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java create mode 100644 connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java create mode 100644 connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java create mode 100644 connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSourceConnector.java create mode 100644 connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java create mode 100644 tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java diff --git a/bin/connect-plugin-path.sh b/bin/connect-plugin-path.sh new file mode 100755 index 00000000000..50742061776 --- /dev/null +++ b/bin/connect-plugin-path.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G" +fi + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConnectPluginPath "$@" diff --git a/bin/windows/connect-plugin-path.bat b/bin/windows/connect-plugin-path.bat new file mode 100644 index 00000000000..3f64a8253e9 --- /dev/null +++ b/bin/windows/connect-plugin-path.bat @@ -0,0 +1,21 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +IF ["%KAFKA_HEAP_OPTS%"] EQU [""] ( + set KAFKA_HEAP_OPTS=-Xms256M -Xmx2G +) + +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConnectPluginPath %* \ No newline at end of file diff --git a/build.gradle b/build.gradle index c4287726aea..23c8b4b9ee9 100644 --- a/build.gradle +++ b/build.gradle @@ -1859,6 +1859,8 @@ project(':tools') { dependencies { implementation project(':clients') implementation project(':server-common') + implementation project(':connect:api') + implementation project(':connect:runtime') implementation project(':log4j-appender') implementation project(':tools:tools-api') implementation libs.argparse4j @@ -1877,6 +1879,9 @@ project(':tools') { testImplementation project(':core').sourceSets.test.output testImplementation project(':server-common') testImplementation project(':server-common').sourceSets.test.output + testImplementation project(':connect:api') + testImplementation project(':connect:runtime') + testImplementation project(':connect:runtime').sourceSets.test.output testImplementation libs.junitJupiter testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc. testImplementation libs.mockitoJunitJupiter // supports MockitoExtension diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 3fea3ebb701..44673899772 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -308,6 +308,8 @@ + + diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties index b19e343265f..b669a4e6381 100644 --- a/config/tools-log4j.properties +++ b/config/tools-log4j.properties @@ -19,3 +19,6 @@ log4j.appender.stderr=org.apache.log4j.ConsoleAppender log4j.appender.stderr.layout=org.apache.log4j.PatternLayout log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.stderr.Target=System.err + +# for connect-plugin-path +log4j.logger.org.reflections=ERROR diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java index acb5b668cf3..4e99d3de53d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java @@ -72,7 +72,7 @@ public abstract class PluginScanner { } private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) { - log.info("Loading plugin from: {}", source.location()); + log.info("Loading plugin from: {}", source); if (log.isDebugEnabled()) { log.debug("Loading plugin urls: {}", Arrays.toString(source.urls())); } @@ -136,13 +136,13 @@ public abstract class PluginScanner { pluginImpl = handleLinkageError(type, source, iterator::next); } catch (ServiceConfigurationError t) { log.error("Failed to discover {} in {}{}", - type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + type.simpleName(), source, reflectiveErrorDescription(t.getCause()), t); continue; } Class pluginKlass = (Class) pluginImpl.getClass(); if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - type.simpleName(), pluginKlass.getClassLoader(), source.location()); + type.simpleName(), pluginKlass.getClassLoader(), source); continue; } result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), type, source)); @@ -181,14 +181,14 @@ public abstract class PluginScanner { || !Objects.equals(lastError.getClass(), t.getClass()) || !Objects.equals(lastError.getMessage(), t.getMessage())) { log.error("Failed to discover {} in {}{}", - type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + type.simpleName(), source, reflectiveErrorDescription(t.getCause()), t); } lastError = t; } } log.error("Received excessive ServiceLoader errors: assuming the runtime ServiceLoader implementation cannot " + "skip faulty implementations. Use a different JRE, or resolve LinkageErrors for plugins in {}", - source.location(), lastError); + source, lastError); throw lastError; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java index c7123d83661..6cc19343b5e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java @@ -18,13 +18,11 @@ package org.apache.kafka.connect.runtime.isolation; import java.net.URL; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Arrays; import java.util.Objects; public class PluginSource { - public static final Path CLASSPATH = Paths.get("classpath"); private final Path location; private final ClassLoader loader; private final URL[] urls; @@ -48,7 +46,7 @@ public class PluginSource { } public boolean isolated() { - return location != CLASSPATH; + return location != null; } @Override @@ -65,4 +63,8 @@ public class PluginSource { result = 31 * result + Arrays.hashCode(urls); return result; } + + public String toString() { + return location == null ? "classpath" : location.toString(); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index a673d6e54fd..d9036c03a4f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -20,6 +20,7 @@ import org.reflections.util.ClasspathHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Modifier; import java.net.MalformedURLException; @@ -197,7 +198,7 @@ public class PluginUtils { return path.toString().toLowerCase(Locale.ROOT).endsWith(".class"); } - public static Set pluginLocations(String pluginPath) { + public static Set pluginLocations(String pluginPath, boolean failFast) { if (pluginPath == null) { return Collections.emptySet(); } @@ -206,6 +207,12 @@ public class PluginUtils { for (String path : pluginPathElements) { try { Path pluginPathElement = Paths.get(path).toAbsolutePath(); + if (pluginPath.isEmpty()) { + log.warn("Plugin path element is empty, evaluating to {}.", pluginPathElement); + } + if (!Files.exists(pluginPathElement)) { + throw new FileNotFoundException(pluginPathElement.toString()); + } // Currently 'plugin.paths' property is a list of top-level directories // containing plugins if (Files.isDirectory(pluginPathElement)) { @@ -214,6 +221,9 @@ public class PluginUtils { pluginLocations.add(pluginPathElement); } } catch (InvalidPathException | IOException e) { + if (failFast) { + throw new RuntimeException(e); + } log.error("Could not get listing for plugin path: {}. Ignoring.", path, e); } } @@ -332,32 +342,38 @@ public class PluginUtils { public static Set pluginSources(Set pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) { Set pluginSources = new LinkedHashSet<>(); for (Path pluginLocation : pluginLocations) { - try { - List pluginUrls = new ArrayList<>(); - for (Path path : pluginUrls(pluginLocation)) { - pluginUrls.add(path.toUri().toURL()); - } - URL[] urls = pluginUrls.toArray(new URL[0]); - PluginClassLoader loader = factory.newPluginClassLoader( - pluginLocation.toUri().toURL(), - urls, - classLoader - ); - pluginSources.add(new PluginSource(pluginLocation, loader, urls)); + pluginSources.add(isolatedPluginSource(pluginLocation, classLoader, factory)); } catch (InvalidPathException | MalformedURLException e) { log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e); } catch (IOException e) { log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e); } } - List parentUrls = new ArrayList<>(); - parentUrls.addAll(ClasspathHelper.forJavaClassPath()); - parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent())); - pluginSources.add(new PluginSource(PluginSource.CLASSPATH, classLoader.getParent(), parentUrls.toArray(new URL[0]))); + pluginSources.add(classpathPluginSource(classLoader.getParent())); return pluginSources; } + public static PluginSource isolatedPluginSource(Path pluginLocation, ClassLoader parent, PluginClassLoaderFactory factory) throws IOException { + List pluginUrls = new ArrayList<>(); + for (Path path : pluginUrls(pluginLocation)) { + pluginUrls.add(path.toUri().toURL()); + } + URL[] urls = pluginUrls.toArray(new URL[0]); + PluginClassLoader loader = factory.newPluginClassLoader( + pluginLocation.toUri().toURL(), + urls, + parent + ); + return new PluginSource(pluginLocation, loader, urls); + } + + public static PluginSource classpathPluginSource(ClassLoader classLoader) { + List parentUrls = new ArrayList<>(); + parentUrls.addAll(ClasspathHelper.forJavaClassPath()); + parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader)); + return new PluginSource(null, classLoader, parentUrls.toArray(new URL[0])); + } /** * Return the simple class name of a plugin as {@code String}. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 72fe40a50a5..8dd9afe93ec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -68,7 +68,7 @@ public class Plugins { Plugins(Map props, ClassLoader parent, ClassLoaderFactory factory) { String pluginPath = WorkerConfig.pluginPath(props); PluginDiscoveryMode discoveryMode = WorkerConfig.pluginDiscovery(props); - Set pluginLocations = PluginUtils.pluginLocations(pluginPath); + Set pluginLocations = PluginUtils.pluginLocations(pluginPath, false); delegatingLoader = factory.newDelegatingClassLoader(parent); Set pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory); scanResult = initLoaders(pluginSources, discoveryMode); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index ad5c00c42ec..332f8ea9091 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -117,26 +117,26 @@ public class ReflectionScanner extends PluginScanner { plugins = reflections.getSubTypesOf((Class) type.superClass()); } catch (ReflectionsException e) { log.debug("Reflections scanner could not find any {} in {} for URLs: {}", - type, source.location(), source.urls(), e); + type, source, source.urls(), e); return Collections.emptySortedSet(); } SortedSet> result = new TreeSet<>(); for (Class pluginKlass : plugins) { if (!PluginUtils.isConcrete(pluginKlass)) { - log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source.location()); + log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source); continue; } if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - pluginKlass, pluginKlass.getClassLoader(), source.location()); + pluginKlass, pluginKlass.getClassLoader(), source); continue; } try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), type, source)); } catch (ReflectiveOperationException | LinkageError e) { log.error("Failed to discover {} in {}: Unable to instantiate {}{}", - type.simpleName(), source.location(), pluginKlass.getSimpleName(), + type.simpleName(), source, pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index d93fef181aa..d6fd3cd54bc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -125,7 +125,7 @@ public class TestPlugins { /** * A plugin which is incorrectly packaged, and is missing a superclass definition. */ - BAD_PACKAGING_MISSING_SUPERCLASS("bad-packaging", "test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER), + BAD_PACKAGING_MISSING_SUPERCLASS("bad-packaging", "test.plugins.MissingSuperclassConverter", false, REMOVE_CLASS_FILTER), /** * A plugin which is packaged with other incorrectly packaged plugins, but itself has no issues loading. */ @@ -173,7 +173,35 @@ public class TestPlugins { /** * A ServiceLoader discovered plugin which subclasses another plugin which is present on the classpath */ - SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY("subclass-of-classpath", "test.plugins.SubclassOfClasspathOverridePolicy"); + SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY("subclass-of-classpath", "test.plugins.SubclassOfClasspathOverridePolicy"), + /** + * A converter which does not have a corresponding ServiceLoader manifest + */ + NON_MIGRATED_CONVERTER("non-migrated", "test.plugins.NonMigratedConverter", false), + /** + * A header converter which does not have a corresponding ServiceLoader manifest + */ + NON_MIGRATED_HEADER_CONVERTER("non-migrated", "test.plugins.NonMigratedHeaderConverter", false), + /** + * A plugin which implements multiple interfaces, and has ServiceLoader manifests for some interfaces and not others. + */ + NON_MIGRATED_MULTI_PLUGIN("non-migrated", "test.plugins.NonMigratedMultiPlugin", false), + /** + * A predicate which does not have a corresponding ServiceLoader manifest + */ + NON_MIGRATED_PREDICATE("non-migrated", "test.plugins.NonMigratedPredicate", false), + /** + * A sink connector which does not have a corresponding ServiceLoader manifest + */ + NON_MIGRATED_SINK_CONNECTOR("non-migrated", "test.plugins.NonMigratedSinkConnector", false), + /** + * A source connector which does not have a corresponding ServiceLoader manifest + */ + NON_MIGRATED_SOURCE_CONNECTOR("non-migrated", "test.plugins.NonMigratedSourceConnector", false), + /** + * A transformation which does not have a corresponding ServiceLoader manifest + */ + NON_MIGRATED_TRANSFORMATION("non-migrated", "test.plugins.NonMigratedTransformation", false); private final String resourceDir; private final String className; diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy b/connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy new file mode 100644 index 00000000000..82400f7255d --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.NonMigratedMultiPlugin diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java new file mode 100644 index 00000000000..d2be39384ad --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; + +import java.util.Map; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Class which is not migrated to include a service loader manifest. + */ +public final class NonMigratedConverter implements Converter { + + @Override + public void configure(final Map configs, final boolean isKey) { + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + return null; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java new file mode 100644 index 00000000000..9e013303a56 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.HeaderConverter; + +import java.io.IOException; +import java.util.Map; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Class which is not migrated to include a service loader manifest. + */ +public class NonMigratedHeaderConverter implements HeaderConverter { + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return null; + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java new file mode 100644 index 00000000000..a82b82f2974 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; +import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.transforms.predicates.Predicate; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.List; +import java.util.Map; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Class which is not migrated to include a service loader manifest. + */ +public final class NonMigratedMultiPlugin implements Converter, HeaderConverter, Predicate, Transformation, ConnectorClientConfigOverridePolicy { + + @Override + public void configure(Map configs, boolean isKey) { + + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + return null; + } + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return null; + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public ConnectRecord apply(ConnectRecord record) { + return null; + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public boolean test(ConnectRecord record) { + return false; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } + + @Override + public List validate(ConnectorClientConfigRequest connectorClientConfigRequest) { + return null; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java new file mode 100644 index 00000000000..3465d0b4633 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.predicates.Predicate; + +import java.util.Map; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Class which is not migrated to include a service loader manifest. + */ +public class NonMigratedPredicate implements Predicate { + + @Override + public void configure(Map configs) { + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public boolean test(ConnectRecord record) { + return false; + } + + @Override + public void close() { + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java new file mode 100644 index 00000000000..b9678a3382f --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; + +import java.util.List; +import java.util.Map; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Class which is not migrated to include a service loader manifest. + */ +public class NonMigratedSinkConnector extends SinkConnector { + + @Override + public void start(Map props) { + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public String version() { + return "1.0.0"; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSourceConnector.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSourceConnector.java new file mode 100644 index 00000000000..edbba6df3fe --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSourceConnector.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.List; +import java.util.Map; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Class which is not migrated to include a service loader manifest. + */ +public class NonMigratedSourceConnector extends SourceConnector { + + @Override + public void start(Map props) { + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public String version() { + return "1.0.0"; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java new file mode 100644 index 00000000000..c3a732568f7 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedTransformation.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Map; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Class which is not migrated to include a service loader manifest. + */ +public class NonMigratedTransformation implements Transformation { + + @Override + public void configure(Map configs) { + } + + @Override + public ConnectRecord apply(ConnectRecord record) { + return null; + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public void close() { + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java new file mode 100644 index 00000000000..c1b0f55259f --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentGroup; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginScanResult; +import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.isolation.PluginUtils; +import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; +import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConnectPluginPath { + + private static final String MANIFEST_PREFIX = "META-INF/services/"; + public static final Object[] LIST_TABLE_COLUMNS = { + "pluginName", + "firstAlias", + "secondAlias", + "pluginVersion", + "pluginType", + "isLoadable", + "hasManifest", + "pluginLocation" // last because it is least important and most repetitive + }; + public static final String NO_ALIAS = "N/A"; + + public static void main(String[] args) { + Exit.exit(mainNoExit(args, System.out, System.err)); + } + + public static int mainNoExit(String[] args, PrintStream out, PrintStream err) { + ArgumentParser parser = parser(); + try { + Namespace namespace = parser.parseArgs(args); + Config config = parseConfig(parser, namespace, out); + runCommand(config); + return 0; + } catch (ArgumentParserException e) { + parser.handleError(e); + return 1; + } catch (TerseException e) { + err.println(e.getMessage()); + return 2; + } catch (Throwable e) { + err.println(e.getMessage()); + err.println(Utils.stackTrace(e)); + return 3; + } + } + + private static ArgumentParser parser() { + ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-plugin-path") + .defaultHelp(true) + .description("Manage plugins on the Connect plugin.path"); + + ArgumentParser listCommand = parser.addSubparsers() + .description("List information about plugins contained within the specified plugin locations") + .dest("subcommand") + .addParser("list"); + + ArgumentParser[] subparsers = new ArgumentParser[] { + listCommand, + }; + + for (ArgumentParser subparser : subparsers) { + ArgumentGroup pluginProviders = subparser.addArgumentGroup("plugin providers"); + pluginProviders.addArgument("--plugin-location") + .setDefault(new ArrayList<>()) + .action(Arguments.append()) + .help("A single plugin location (jar file or directory)"); + + pluginProviders.addArgument("--plugin-path") + .setDefault(new ArrayList<>()) + .action(Arguments.append()) + .help("A comma-delimited list of locations containing plugins"); + + pluginProviders.addArgument("--worker-config") + .setDefault(new ArrayList<>()) + .action(Arguments.append()) + .help("A Connect worker configuration file"); + } + + return parser; + } + + private static Config parseConfig(ArgumentParser parser, Namespace namespace, PrintStream out) throws ArgumentParserException, TerseException { + Set locations = parseLocations(parser, namespace); + String subcommand = namespace.getString("subcommand"); + if (subcommand == null) { + throw new ArgumentParserException("No subcommand specified", parser); + } + switch (subcommand) { + case "list": + return new Config(Command.LIST, locations, out); + default: + throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser); + } + } + + private static Set parseLocations(ArgumentParser parser, Namespace namespace) throws ArgumentParserException, TerseException { + List rawLocations = new ArrayList<>(namespace.getList("plugin_location")); + List rawPluginPaths = new ArrayList<>(namespace.getList("plugin_path")); + List rawWorkerConfigs = new ArrayList<>(namespace.getList("worker_config")); + if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() && rawWorkerConfigs.isEmpty()) { + throw new ArgumentParserException("Must specify at least one --plugin-location, --plugin-path, or --worker-config", parser); + } + Set pluginLocations = new LinkedHashSet<>(); + for (String rawWorkerConfig : rawWorkerConfigs) { + Properties properties; + try { + properties = Utils.loadProps(rawWorkerConfig); + } catch (IOException e) { + throw new TerseException("Unable to read worker config at " + rawWorkerConfig); + } + String pluginPath = properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG); + if (pluginPath != null) { + rawPluginPaths.add(pluginPath); + } + } + for (String rawPluginPath : rawPluginPaths) { + try { + pluginLocations.addAll(PluginUtils.pluginLocations(rawPluginPath, true)); + } catch (UncheckedIOException e) { + throw new TerseException("Unable to parse plugin path " + rawPluginPath + ": " + e.getMessage()); + } + } + for (String rawLocation : rawLocations) { + Path pluginLocation = Paths.get(rawLocation); + if (!pluginLocation.toFile().exists()) { + throw new TerseException("Specified location " + pluginLocation + " does not exist"); + } + pluginLocations.add(pluginLocation); + } + return pluginLocations; + } + + enum Command { + LIST + } + + private static class Config { + private final Command command; + private final Set locations; + private final PrintStream out; + + private Config(Command command, Set locations, PrintStream out) { + this.command = command; + this.locations = locations; + this.out = out; + } + + @Override + public String toString() { + return "Config{" + + "command=" + command + + ", locations=" + locations + + '}'; + } + } + + public static void runCommand(Config config) throws TerseException { + try { + ClassLoader parent = ConnectPluginPath.class.getClassLoader(); + ServiceLoaderScanner serviceLoaderScanner = new ServiceLoaderScanner(); + ReflectionScanner reflectionScanner = new ReflectionScanner(); + // Process the contents of the classpath to exclude it from later results. + PluginSource classpathSource = PluginUtils.classpathPluginSource(parent); + Map> classpathManifests = findManifests(classpathSource, Collections.emptyMap()); + PluginScanResult classpathPlugins = discoverPlugins(classpathSource, reflectionScanner, serviceLoaderScanner); + Map> rowsByLocation = new LinkedHashMap<>(); + Set classpathRows = enumerateRows(null, classpathManifests, classpathPlugins); + rowsByLocation.put(null, classpathRows); + + ClassLoaderFactory factory = new ClassLoaderFactory(); + try (DelegatingClassLoader delegatingClassLoader = factory.newDelegatingClassLoader(parent)) { + beginCommand(config); + for (Path pluginLocation : config.locations) { + PluginSource source = PluginUtils.isolatedPluginSource(pluginLocation, delegatingClassLoader, factory); + Map> manifests = findManifests(source, classpathManifests); + PluginScanResult plugins = discoverPlugins(source, reflectionScanner, serviceLoaderScanner); + Set rows = enumerateRows(pluginLocation, manifests, plugins); + rowsByLocation.put(pluginLocation, rows); + for (Row row : rows) { + handlePlugin(config, row); + } + } + endCommand(config, rowsByLocation); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * The unit of work for a command. + *

This is unique to the (source, class, type) tuple, and contains additional pre-computed information + * that pertains to this specific plugin. + */ + private static class Row { + private final Path pluginLocation; + private final String className; + private final PluginType type; + private final String version; + private final List aliases; + private final boolean loadable; + private final boolean hasManifest; + + public Row(Path pluginLocation, String className, PluginType type, String version, List aliases, boolean loadable, boolean hasManifest) { + this.pluginLocation = pluginLocation; + this.className = Objects.requireNonNull(className, "className must be non-null"); + this.version = Objects.requireNonNull(version, "version must be non-null"); + this.type = Objects.requireNonNull(type, "type must be non-null"); + this.aliases = Objects.requireNonNull(aliases, "aliases must be non-null"); + this.loadable = loadable; + this.hasManifest = hasManifest; + } + + private boolean loadable() { + return loadable; + } + + private boolean compatible() { + return loadable && hasManifest; + } + + private boolean incompatible() { + return !compatible(); + } + + private String locationString() { + return pluginLocation == null ? "classpath" : pluginLocation.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Row row = (Row) o; + return Objects.equals(pluginLocation, row.pluginLocation) && className.equals(row.className) && type == row.type; + } + + @Override + public int hashCode() { + return Objects.hash(pluginLocation, className, type); + } + } + + private static Set enumerateRows(Path pluginLocation, Map> manifests, PluginScanResult scanResult) { + Set rows = new HashSet<>(); + // Perform a deep copy of the manifests because we're going to be mutating our copy. + Map> unloadablePlugins = manifests.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()))); + scanResult.forEach(pluginDesc -> { + // Emit a loadable row for this scan result, since it was found during plugin discovery + Set rowAliases = new LinkedHashSet<>(); + rowAliases.add(PluginUtils.simpleName(pluginDesc)); + rowAliases.add(PluginUtils.prunedName(pluginDesc)); + rows.add(newRow(pluginLocation, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true, manifests)); + // Remove the ManifestEntry if it has the same className and type as one of the loadable plugins. + unloadablePlugins.getOrDefault(pluginDesc.className(), Collections.emptySet()).removeIf(entry -> entry.type == pluginDesc.type()); + }); + unloadablePlugins.values().forEach(entries -> entries.forEach(entry -> { + // Emit a non-loadable row, since all the loadable rows showed up in the previous iteration. + // Two ManifestEntries may produce the same row if they have different URIs + rows.add(newRow(pluginLocation, entry.className, Collections.emptyList(), entry.type, PluginDesc.UNDEFINED_VERSION, false, manifests)); + })); + return rows; + } + + private static Row newRow(Path pluginLocation, String className, List rowAliases, PluginType type, String version, boolean loadable, Map> manifests) { + boolean hasManifest = manifests.containsKey(className) && manifests.get(className).stream().anyMatch(e -> e.type == type); + return new Row(pluginLocation, className, type, version, rowAliases, loadable, hasManifest); + } + + private static void beginCommand(Config config) { + if (config.command == Command.LIST) { + // The list command prints a TSV-formatted table with details of the found plugins + // This is officially human-readable output with no guarantees for backwards-compatibility + // It should be reasonably easy to parse for ad-hoc scripting use-cases. + listTablePrint(config, LIST_TABLE_COLUMNS); + } + } + + private static void handlePlugin(Config config, Row row) { + if (config.command == Command.LIST) { + String firstAlias = row.aliases.size() > 0 ? row.aliases.get(0) : NO_ALIAS; + String secondAlias = row.aliases.size() > 1 ? row.aliases.get(1) : NO_ALIAS; + listTablePrint(config, + row.className, + firstAlias, + secondAlias, + row.version, + row.type, + row.loadable, + row.hasManifest, + // last because it is least important and most repetitive + row.locationString() + ); + } + } + + private static void endCommand( + Config config, + Map> rowsByLocation + ) { + if (config.command == Command.LIST) { + // end the table with an empty line to enable users to separate the table from the summary. + config.out.println(); + rowsByLocation.remove(null); + Set isolatedRows = rowsByLocation.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + long totalPlugins = isolatedRows.size(); + long loadablePlugins = isolatedRows.stream().filter(Row::loadable).count(); + long compatiblePlugins = isolatedRows.stream().filter(Row::compatible).count(); + config.out.printf("Total plugins: \t%d%n", totalPlugins); + config.out.printf("Loadable plugins: \t%d%n", loadablePlugins); + config.out.printf("Compatible plugins: \t%d%n", compatiblePlugins); + } + } + + private static void listTablePrint(Config config, Object... args) { + if (ConnectPluginPath.LIST_TABLE_COLUMNS.length != args.length) { + throw new IllegalArgumentException("Table must have exactly " + ConnectPluginPath.LIST_TABLE_COLUMNS.length + " columns"); + } + config.out.println(Stream.of(args) + .map(Objects::toString) + .collect(Collectors.joining("\t"))); + } + + private static PluginScanResult discoverPlugins(PluginSource source, ReflectionScanner reflectionScanner, ServiceLoaderScanner serviceLoaderScanner) { + PluginScanResult serviceLoadResult = serviceLoaderScanner.discoverPlugins(Collections.singleton(source)); + PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Collections.singleton(source)); + return new PluginScanResult(Arrays.asList(serviceLoadResult, reflectiveResult)); + } + + private static class ManifestEntry { + private final URI manifestURI; + private final String className; + private final PluginType type; + + private ManifestEntry(URI manifestURI, String className, PluginType type) { + this.manifestURI = manifestURI; + this.className = className; + this.type = type; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ManifestEntry that = (ManifestEntry) o; + return manifestURI.equals(that.manifestURI) && className.equals(that.className) && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(manifestURI, className, type); + } + } + + private static Map> findManifests(PluginSource source, Map> exclude) { + Map> manifests = new LinkedHashMap<>(); + for (PluginType type : PluginType.values()) { + try { + Enumeration resources = source.loader().getResources(MANIFEST_PREFIX + type.superClass().getName()); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + for (String className : parse(url)) { + ManifestEntry e = new ManifestEntry(url.toURI(), className, type); + manifests.computeIfAbsent(className, ignored -> new ArrayList<>()).add(e); + } + } + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + for (Map.Entry> entry : exclude.entrySet()) { + String className = entry.getKey(); + List excluded = entry.getValue(); + // Note this must be a remove and not removeAll, because we want to remove only one copy at a time. + // If the same jar is present on the classpath and plugin path, then manifests will contain 2 identical + // ManifestEntry instances, with a third copy in the excludes. After the excludes are processed, + // manifests should contain exactly one copy of the ManifestEntry. + for (ManifestEntry e : excluded) { + manifests.getOrDefault(className, Collections.emptyList()).remove(e); + } + } + return manifests; + } + + // Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11 + private static Set parse(URL u) { + Set names = new LinkedHashSet<>(); // preserve insertion order + try { + URLConnection uc = u.openConnection(); + uc.setUseCaches(false); + try (InputStream in = uc.getInputStream(); + BufferedReader r + = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) { + int lc = 1; + while ((lc = parseLine(u, r, lc, names)) >= 0) { + // pass + } + } + } catch (IOException x) { + throw new RuntimeException("Error accessing configuration file", x); + } + return names; + } + + // Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11 + private static int parseLine(URL u, BufferedReader r, int lc, Set names) throws IOException { + String ln = r.readLine(); + if (ln == null) { + return -1; + } + int ci = ln.indexOf('#'); + if (ci >= 0) ln = ln.substring(0, ci); + ln = ln.trim(); + int n = ln.length(); + if (n != 0) { + if ((ln.indexOf(' ') >= 0) || (ln.indexOf('\t') >= 0)) + throw new IOException("Illegal configuration-file syntax in " + u); + int cp = ln.codePointAt(0); + if (!Character.isJavaIdentifierStart(cp)) + throw new IOException("Illegal provider-class name: " + ln + " in " + u); + int start = Character.charCount(cp); + for (int i = start; i < n; i += Character.charCount(cp)) { + cp = ln.codePointAt(i); + if (!Character.isJavaIdentifierPart(cp) && (cp != '.')) + throw new IOException("Illegal provider-class name: " + ln + " in " + u); + } + names.add(ln); + } + return lc + 1; + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java new file mode 100644 index 00000000000..6581c300728 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginScanResult; +import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginUtils; +import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; +import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.jar.JarFile; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConnectPluginPathTest { + + private static final Logger log = LoggerFactory.getLogger(ConnectPluginPathTest.class); + + private static final int NAME_COL = 0; + private static final int ALIAS1_COL = 1; + private static final int ALIAS2_COL = 2; + private static final int VERSION_COL = 3; + private static final int TYPE_COL = 4; + private static final int LOADABLE_COL = 5; + private static final int MANIFEST_COL = 6; + private static final int LOCATION_COL = 7; + + @TempDir + public Path workspace; + + @BeforeAll + public static void setUp() { + // Work around a circular-dependency in TestPlugins. + TestPlugins.pluginPath(); + } + + + @Test + public void testNoArguments() { + CommandResult res = runCommand(); + assertNotEquals(0, res.returnCode); + } + + @Test + public void testListNoArguments() { + CommandResult res = runCommand( + "list" + ); + assertNotEquals(0, res.returnCode); + } + + @ParameterizedTest + @EnumSource + public void testListOneLocation(PluginLocationType type) { + CommandResult res = runCommand( + "list", + "--plugin-location", + setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN) + ); + Map> table = assertListSuccess(res); + assertNonMigratedPluginsPresent(table); + } + + @ParameterizedTest + @EnumSource + public void testListMultipleLocations(PluginLocationType type) { + CommandResult res = runCommand( + "list", + "--plugin-location", + setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN), + "--plugin-location", + setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE) + ); + Map> table = assertListSuccess(res); + assertNonMigratedPluginsPresent(table); + assertPluginsAreCompatible(table, + TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE); + } + + @ParameterizedTest + @EnumSource + public void testListOnePluginPath(PluginLocationType type) { + CommandResult res = runCommand( + "list", + "--plugin-path", + setupPluginPathElement(workspace.resolve("path-a"), type, + TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE) + ); + Map> table = assertListSuccess(res); + assertPluginsAreCompatible(table, + TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE); + } + + @ParameterizedTest + @EnumSource + public void testListMultiplePluginPaths(PluginLocationType type) { + CommandResult res = runCommand( + "list", + "--plugin-path", + setupPluginPathElement(workspace.resolve("path-a"), type, + TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE), + "--plugin-path", + setupPluginPathElement(workspace.resolve("path-b"), type, + TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER, TestPlugins.TestPlugin.ALIASED_STATIC_FIELD) + ); + Map> table = assertListSuccess(res); + assertPluginsAreCompatible(table, + TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE, + TestPlugins.TestPlugin.ALIASED_STATIC_FIELD); + } + + @ParameterizedTest + @EnumSource + public void testListOneWorkerConfig(PluginLocationType type) { + CommandResult res = runCommand( + "list", + "--worker-config", + setupWorkerConfig(workspace.resolve("worker.properties"), + setupPluginPathElement(workspace.resolve("path-a"), type, + TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED)) + ); + Map> table = assertListSuccess(res); + assertBadPackagingPluginsPresent(table); + } + + @ParameterizedTest + @EnumSource + public void testListMultipleWorkerConfigs(PluginLocationType type) { + CommandResult res = runCommand( + "list", + "--worker-config", + setupWorkerConfig(workspace.resolve("worker-a.properties"), + setupPluginPathElement(workspace.resolve("path-a"), type, + TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN)), + "--worker-config", + setupWorkerConfig(workspace.resolve("worker-b.properties"), + setupPluginPathElement(workspace.resolve("path-b"), type, + TestPlugins.TestPlugin.SERVICE_LOADER)) + ); + Map> table = assertListSuccess(res); + assertNonMigratedPluginsPresent(table); + assertPluginsAreCompatible(table, + TestPlugins.TestPlugin.SERVICE_LOADER); + } + + + private static Map> assertListSuccess(CommandResult result) { + assertEquals(0, result.returnCode); + Map> table = parseTable(result.out); + assertIsolatedPluginsInOutput(result.reflective, table); + return table; + } + + private static void assertPluginsAreCompatible(Map> table, TestPlugins.TestPlugin... plugins) { + assertPluginMigrationStatus(table, true, true, plugins); + } + + private static void assertNonMigratedPluginsPresent(Map> table) { + assertPluginMigrationStatus(table, true, false, + TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, + TestPlugins.TestPlugin.NON_MIGRATED_HEADER_CONVERTER, + TestPlugins.TestPlugin.NON_MIGRATED_PREDICATE, + TestPlugins.TestPlugin.NON_MIGRATED_SINK_CONNECTOR, + TestPlugins.TestPlugin.NON_MIGRATED_SOURCE_CONNECTOR, + TestPlugins.TestPlugin.NON_MIGRATED_TRANSFORMATION); + // This plugin is partially compatible + assertPluginMigrationStatus(table, true, null, + TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN); + } + + private static void assertBadPackagingPluginsPresent(Map> table) { + assertPluginsAreCompatible(table, + TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED, + TestPlugins.TestPlugin.BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR); + assertPluginMigrationStatus(table, false, true, + TestPlugins.TestPlugin.BAD_PACKAGING_MISSING_SUPERCLASS, + TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_CONNECTOR, + TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR, + TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_PRIVATE_CONNECTOR, + TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONNECTOR, + TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONVERTER, + TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_OVERRIDE_POLICY, + TestPlugins.TestPlugin.BAD_PACKAGING_INNER_CLASS_CONNECTOR, + TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION); + } + + + private static void assertIsolatedPluginsInOutput(PluginScanResult reflectiveResult, Map> table) { + reflectiveResult.forEach(pluginDesc -> { + if (pluginDesc.location().equals("classpath")) { + // Classpath plugins do not appear in list output + return; + } + assertTrue(table.containsKey(pluginDesc.className()), "Plugin " + pluginDesc.className() + " does not appear in list output"); + boolean foundType = false; + for (String[] row : table.get(pluginDesc.className())) { + if (row[TYPE_COL].equals(pluginDesc.typeName())) { + foundType = true; + assertTrue(row[ALIAS1_COL].equals(ConnectPluginPath.NO_ALIAS) || row[ALIAS1_COL].equals(PluginUtils.simpleName(pluginDesc))); + assertTrue(row[ALIAS2_COL].equals(ConnectPluginPath.NO_ALIAS) || row[ALIAS2_COL].equals(PluginUtils.prunedName(pluginDesc))); + assertEquals(pluginDesc.version(), row[VERSION_COL]); + try { + Path pluginLocation = Paths.get(row[LOCATION_COL]); + // This transforms the raw path `/path/to/somewhere` to the url `file:/path/to/somewhere` + String pluginLocationUrl = pluginLocation.toUri().toURL().toString(); + assertEquals(pluginDesc.location(), pluginLocationUrl); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + } + assertTrue(foundType, "Plugin " + pluginDesc.className() + " does not have row for " + pluginDesc.typeName()); + }); + } + + private static void assertPluginMigrationStatus(Map> table, Boolean loadable, Boolean compatible, TestPlugins.TestPlugin... plugins) { + for (TestPlugins.TestPlugin plugin : plugins) { + assertTrue(table.containsKey(plugin.className()), "Plugin " + plugin.className() + " does not appear in list output"); + for (String[] row : table.get(plugin.className())) { + log.info("row" + Arrays.toString(row)); + if (loadable != null) { + assertEquals(loadable, Boolean.parseBoolean(row[LOADABLE_COL]), "Plugin loadable column for " + plugin.className() + " incorrect"); + } + if (compatible != null) { + assertEquals(compatible, Boolean.parseBoolean(row[MANIFEST_COL]), "Plugin hasManifest column for " + plugin.className() + " incorrect"); + } + } + } + } + + private enum PluginLocationType { + CLASS_HIERARCHY, + SINGLE_JAR, + MULTI_JAR + } + + private static class PluginLocation { + private final Path path; + + private PluginLocation(Path path) { + this.path = path; + } + + @Override + public String toString() { + return path.toString(); + } + } + + /** + * Populate a writable disk path to be usable as a single plugin location. + * The returned path will be usable as a single path. + * @param path A non-existent path immediately within a writable directory, suggesting a location for this plugin. + * @param type The format to which the on-disk plugin should conform + * @param plugin The plugin which should be written to the specified path + * @return The final usable path name to this location, in case it is different from the suggested input path. + */ + private static PluginLocation setupLocation(Path path, PluginLocationType type, TestPlugins.TestPlugin plugin) { + try { + Path jarPath = TestPlugins.pluginPath(plugin).stream().findFirst().get(); + switch (type) { + case CLASS_HIERARCHY: { + try (JarFile jarFile = new JarFile(jarPath.toFile())) { + jarFile.stream().forEach(jarEntry -> { + Path entryPath = path.resolve(jarEntry.getName()); + try { + entryPath.getParent().toFile().mkdirs(); + Files.copy(jarFile.getInputStream(jarEntry), entryPath, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + return new PluginLocation(path); + } + case SINGLE_JAR: { + Path outputJar = path.resolveSibling(path.getFileName() + ".jar"); + outputJar.getParent().toFile().mkdirs(); + Files.copy(jarPath, outputJar, StandardCopyOption.REPLACE_EXISTING); + return new PluginLocation(outputJar); + } + case MULTI_JAR: { + Path outputJar = path.resolve(jarPath.getFileName()); + outputJar.getParent().toFile().mkdirs(); + Files.copy(jarPath, outputJar, StandardCopyOption.REPLACE_EXISTING); + return new PluginLocation(path); + } + default: + throw new IllegalArgumentException("Unknown PluginLocationType"); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static class PluginPathElement { + private final Path root; + private final List locations; + + private PluginPathElement(Path root, List locations) { + this.root = root; + this.locations = locations; + } + + @Override + public String toString() { + return root.toString(); + } + } + + /** + * Populate a writable disk path to be usable as single {@code plugin.path} element providing the specified plugins + * @param path A directory that should contain the populated plugins, will be created if it does not exist. + * @param type The format to which the on-disk plugins should conform + * @param plugins The plugins which should be written to the specified path + * @return The specific inner locations of the plugins that were written. + */ + private PluginPathElement setupPluginPathElement(Path path, PluginLocationType type, TestPlugins.TestPlugin... plugins) { + List locations = new ArrayList<>(); + for (int i = 0; i < plugins.length; i++) { + TestPlugins.TestPlugin plugin = plugins[i]; + locations.add(setupLocation(path.resolve("plugin-" + i), type, plugin)); + } + return new PluginPathElement(path, locations); + } + + private static class WorkerConfig { + private final Path configFile; + private final List pluginPathElements; + + private WorkerConfig(Path configFile, List pluginPathElements) { + this.configFile = configFile; + this.pluginPathElements = pluginPathElements; + } + + @Override + public String toString() { + return configFile.toString(); + } + } + + /** + * Populate a writable disk path + * @param path + * @param pluginPathElements + * @return + */ + private static WorkerConfig setupWorkerConfig(Path path, PluginPathElement... pluginPathElements) { + path.getParent().toFile().mkdirs(); + Properties properties = new Properties(); + String pluginPath = Arrays.stream(pluginPathElements) + .map(Object::toString) + .collect(Collectors.joining(", ")); + properties.setProperty("plugin.path", pluginPath); + try (OutputStream outputStream = Files.newOutputStream(path)) { + properties.store(outputStream, "dummy worker properties file"); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return new WorkerConfig(path, Arrays.asList(pluginPathElements)); + } + + private static class CommandResult { + public CommandResult(int returnCode, String out, String err, PluginScanResult reflective, PluginScanResult serviceLoading) { + this.returnCode = returnCode; + this.out = out; + this.err = err; + this.reflective = reflective; + this.serviceLoading = serviceLoading; + } + + int returnCode; + String out; + String err; + PluginScanResult reflective; + PluginScanResult serviceLoading; + } + + private static CommandResult runCommand(Object... args) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + try { + int returnCode = ConnectPluginPath.mainNoExit( + Arrays.stream(args) + .map(Object::toString) + .collect(Collectors.toList()) + .toArray(new String[]{}), + new PrintStream(out, true, "utf-8"), + new PrintStream(err, true, "utf-8")); + Set pluginLocations = getPluginLocations(args); + ClassLoader parent = ConnectPluginPath.class.getClassLoader(); + ClassLoaderFactory factory = new ClassLoaderFactory(); + try (DelegatingClassLoader delegatingClassLoader = factory.newDelegatingClassLoader(parent)) { + Set sources = PluginUtils.pluginSources(pluginLocations, delegatingClassLoader, factory); + String stdout = new String(out.toByteArray(), StandardCharsets.UTF_8); + String stderr = new String(err.toByteArray(), StandardCharsets.UTF_8); + log.info("STDOUT:\n{}", stdout); + log.info("STDERR:\n{}", stderr); + return new CommandResult( + returnCode, + stdout, + stderr, + new ReflectionScanner().discoverPlugins(sources), + new ServiceLoaderScanner().discoverPlugins(sources) + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + private static Set getPluginLocations(Object[] args) { + return Arrays.stream(args) + .flatMap(obj -> { + if (obj instanceof WorkerConfig) { + return ((WorkerConfig) obj).pluginPathElements.stream(); + } else { + return Stream.of(obj); + } + }) + .flatMap(obj -> { + if (obj instanceof PluginPathElement) { + return ((PluginPathElement) obj).locations.stream(); + } else { + return Stream.of(obj); + } + }) + .map(obj -> { + if (obj instanceof PluginLocation) { + return ((PluginLocation) obj).path; + } else { + return null; + } + }) + + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + + /** + * Parse the main table of the list command. + *

Map is keyed on the plugin name, with a list of rows which referred to that name if there are multiple. + * Each row is pre-split into columns. + * @param listOutput An executed list command + * @return A parsed form of the table grouped by plugin class names + */ + private static Map> parseTable(String listOutput) { + // Split on the empty line which should appear in the output. + String[] sections = listOutput.split("\n\\s*\n"); + assertTrue(sections.length > 1, "No empty line in list output"); + String[] rows = sections[0].split("\n"); + Map> table = new HashMap<>(); + // Assert that the first row is the header + assertArrayEquals(ConnectPluginPath.LIST_TABLE_COLUMNS, rows[0].split("\t"), "Table header doesn't have the right columns"); + // Skip the header to parse the rows in the table. + for (int i = 1; i < rows.length; i++) { + // group rows by + String[] row = rows[i].split("\t"); + assertEquals(ConnectPluginPath.LIST_TABLE_COLUMNS.length, row.length, "Table row is the wrong length"); + table.computeIfAbsent(row[NAME_COL], ignored -> new ArrayList<>()).add(row); + } + return table; + } +}