> result = new TreeSet<>();
for (Class extends T> pluginKlass : plugins) {
if (!PluginUtils.isConcrete(pluginKlass)) {
- log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source.location());
+ log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source);
continue;
}
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
- pluginKlass, pluginKlass.getClassLoader(), source.location());
+ pluginKlass, pluginKlass.getClassLoader(), source);
continue;
}
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), type, source));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {} in {}: Unable to instantiate {}{}",
- type.simpleName(), source.location(), pluginKlass.getSimpleName(),
+ type.simpleName(), source, pluginKlass.getSimpleName(),
reflectiveErrorDescription(e), e);
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
index d93fef181aa..d6fd3cd54bc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java
@@ -125,7 +125,7 @@ public class TestPlugins {
/**
* A plugin which is incorrectly packaged, and is missing a superclass definition.
*/
- BAD_PACKAGING_MISSING_SUPERCLASS("bad-packaging", "test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER),
+ BAD_PACKAGING_MISSING_SUPERCLASS("bad-packaging", "test.plugins.MissingSuperclassConverter", false, REMOVE_CLASS_FILTER),
/**
* A plugin which is packaged with other incorrectly packaged plugins, but itself has no issues loading.
*/
@@ -173,7 +173,35 @@ public class TestPlugins {
/**
* A ServiceLoader discovered plugin which subclasses another plugin which is present on the classpath
*/
- SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY("subclass-of-classpath", "test.plugins.SubclassOfClasspathOverridePolicy");
+ SUBCLASS_OF_CLASSPATH_OVERRIDE_POLICY("subclass-of-classpath", "test.plugins.SubclassOfClasspathOverridePolicy"),
+ /**
+ * A converter which does not have a corresponding ServiceLoader manifest
+ */
+ NON_MIGRATED_CONVERTER("non-migrated", "test.plugins.NonMigratedConverter", false),
+ /**
+ * A header converter which does not have a corresponding ServiceLoader manifest
+ */
+ NON_MIGRATED_HEADER_CONVERTER("non-migrated", "test.plugins.NonMigratedHeaderConverter", false),
+ /**
+ * A plugin which implements multiple interfaces, and has ServiceLoader manifests for some interfaces and not others.
+ */
+ NON_MIGRATED_MULTI_PLUGIN("non-migrated", "test.plugins.NonMigratedMultiPlugin", false),
+ /**
+ * A predicate which does not have a corresponding ServiceLoader manifest
+ */
+ NON_MIGRATED_PREDICATE("non-migrated", "test.plugins.NonMigratedPredicate", false),
+ /**
+ * A sink connector which does not have a corresponding ServiceLoader manifest
+ */
+ NON_MIGRATED_SINK_CONNECTOR("non-migrated", "test.plugins.NonMigratedSinkConnector", false),
+ /**
+ * A source connector which does not have a corresponding ServiceLoader manifest
+ */
+ NON_MIGRATED_SOURCE_CONNECTOR("non-migrated", "test.plugins.NonMigratedSourceConnector", false),
+ /**
+ * A transformation which does not have a corresponding ServiceLoader manifest
+ */
+ NON_MIGRATED_TRANSFORMATION("non-migrated", "test.plugins.NonMigratedTransformation", false);
private final String resourceDir;
private final String className;
diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy b/connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
new file mode 100644
index 00000000000..82400f7255d
--- /dev/null
+++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+test.plugins.NonMigratedMultiPlugin
diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
new file mode 100644
index 00000000000..d2be39384ad
--- /dev/null
+++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedConverter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+import java.util.Map;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * Class which is not migrated to include a service loader manifest.
+ */
+public final class NonMigratedConverter implements Converter {
+
+ @Override
+ public void configure(final Map configs, final boolean isKey) {
+ }
+
+ @Override
+ public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
+ return new byte[0];
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(final String topic, final byte[] value) {
+ return null;
+ }
+}
diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
new file mode 100644
index 00000000000..9e013303a56
--- /dev/null
+++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedHeaderConverter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * Class which is not migrated to include a service loader manifest.
+ */
+public class NonMigratedHeaderConverter implements HeaderConverter {
+
+ @Override
+ public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+ return new byte[0];
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(Map configs) {
+ }
+}
diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
new file mode 100644
index 00000000000..a82b82f2974
--- /dev/null
+++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedMultiPlugin.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * Class which is not migrated to include a service loader manifest.
+ */
+public final class NonMigratedMultiPlugin implements Converter, HeaderConverter, Predicate, Transformation, ConnectorClientConfigOverridePolicy {
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+
+ }
+
+ @Override
+ public byte[] fromConnectData(String topic, Schema schema, Object value) {
+ return new byte[0];
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(String topic, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+ return new byte[0];
+ }
+
+ @Override
+ public ConnectRecord apply(ConnectRecord record) {
+ return null;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+
+ @Override
+ public boolean test(ConnectRecord record) {
+ return false;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map configs) {
+
+ }
+
+ @Override
+ public List validate(ConnectorClientConfigRequest connectorClientConfigRequest) {
+ return null;
+ }
+}
diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
new file mode 100644
index 00000000000..3465d0b4633
--- /dev/null
+++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedPredicate.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+import java.util.Map;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * Class which is not migrated to include a service loader manifest.
+ */
+public class NonMigratedPredicate implements Predicate {
+
+ @Override
+ public void configure(Map configs) {
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+
+ @Override
+ public boolean test(ConnectRecord record) {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java
new file mode 100644
index 00000000000..b9678a3382f
--- /dev/null
+++ b/connect/runtime/src/test/resources/test-plugins/non-migrated/test/plugins/NonMigratedSinkConnector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * Class which is not migrated to include a service loader manifest.
+ */
+public class NonMigratedSinkConnector extends SinkConnector {
+
+ @Override
+ public void start(Map props) {
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List