Browse Source

KAFKA-15162: Reflectively find plugins in parent ClassLoaders that aren't on the classpath (#13977)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewed-by: Chris Egerton <chris.egerton@aiven.io>
pull/14061/head
Greg Harris 1 year ago committed by GitHub
parent
commit
d5a00cca74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  2. 7
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
  3. 12
      connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java
  4. 12
      connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java

@ -350,8 +350,10 @@ public class PluginUtils { @@ -350,8 +350,10 @@ public class PluginUtils {
log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
}
}
URL[] classpathUrls = ClasspathHelper.forJavaClassPath().toArray(new URL[0]);
pluginSources.add(new PluginSource(null, classLoader.getParent(), classpathUrls));
List<URL> parentUrls = new ArrayList<>();
parentUrls.addAll(ClasspathHelper.forJavaClassPath());
parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent()));
pluginSources.add(new PluginSource(null, classLoader.getParent(), parentUrls.toArray(new URL[0])));
return pluginSources;
}

7
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java

@ -477,7 +477,7 @@ public class PluginsTest { @@ -477,7 +477,7 @@ public class PluginsTest {
}
private void assertClassLoaderReadsVersionFromResource(
TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) throws MalformedURLException {
TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) {
URL[] systemPath = TestPlugins.pluginPath(parentResource)
.stream()
.map(Path::toFile)
@ -500,6 +500,11 @@ public class PluginsTest { @@ -500,6 +500,11 @@ public class PluginsTest {
);
plugins = new Plugins(pluginProps, parent, new ClassLoaderFactory());
assertTrue("Should find plugin in plugin classloader",
plugins.converters().stream().anyMatch(desc -> desc.loader() instanceof PluginClassLoader));
assertTrue("Should find plugin in parent classloader",
plugins.converters().stream().anyMatch(desc -> parent.equals(desc.loader())));
Converter converter = plugins.newPlugin(
className,
new AbstractConfig(new ConfigDef(), Collections.emptyMap()),

12
connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java

@ -31,6 +31,7 @@ import java.net.URL; @@ -31,6 +31,7 @@ import java.net.URL;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.components.Versioned;
/**
* Fake plugin class for testing classloading isolation
@ -39,7 +40,7 @@ import org.apache.kafka.connect.storage.Converter; @@ -39,7 +40,7 @@ import org.apache.kafka.connect.storage.Converter;
* Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, Schema, Object)}
* and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
*/
public class ReadVersionFromResource implements Converter {
public class ReadVersionFromResource implements Converter, Versioned {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
@ -78,4 +79,13 @@ public class ReadVersionFromResource implements Converter { @@ -78,4 +79,13 @@ public class ReadVersionFromResource implements Converter {
throw new AssertionError(e);
}
}
@Override
public String version() {
try (InputStream stream = this.getClass().getResourceAsStream("/version")) {
return version(stream);
} catch (IOException e) {
throw new AssertionError(e);
}
}
}

12
connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java

@ -31,6 +31,7 @@ import java.net.URL; @@ -31,6 +31,7 @@ import java.net.URL;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.components.Versioned;
/**
* Fake plugin class for testing classloading isolation.
@ -39,7 +40,7 @@ import org.apache.kafka.connect.storage.Converter; @@ -39,7 +40,7 @@ import org.apache.kafka.connect.storage.Converter;
* Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, Schema, Object)}
* and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
*/
public class ReadVersionFromResource implements Converter {
public class ReadVersionFromResource implements Converter, Versioned {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
@ -78,4 +79,13 @@ public class ReadVersionFromResource implements Converter { @@ -78,4 +79,13 @@ public class ReadVersionFromResource implements Converter {
throw new AssertionError(e);
}
}
@Override
public String version() {
try (InputStream stream = this.getClass().getResourceAsStream("/version")) {
return version(stream);
} catch (IOException e) {
throw new AssertionError(e);
}
}
}
Loading…
Cancel
Save