Browse Source

KAFKA-6503: Parallelize plugin scanning

This is a small change to parallelize plugin scanning.  This may help in some environments where otherwise plugin scanning is slow.

Author: Robert Yokota <rayokota@gmail.com>

Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #4561 from rayokota/K6503-improve-plugin-scanning
pull/4472/merge
Robert Yokota 7 years ago committed by Ewen Cheslack-Postava
parent
commit
3af13967db
  1. 29
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java

29
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java

@ -20,7 +20,10 @@ import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.reflections.Configuration;
import org.reflections.Reflections; import org.reflections.Reflections;
import org.reflections.ReflectionsException;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ClasspathHelper; import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder; import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -269,7 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader {
ConfigurationBuilder builder = new ConfigurationBuilder(); ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader}); builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls); builder.addUrls(urls);
Reflections reflections = new Reflections(builder); builder.setScanners(new SubTypesScanner());
builder.setExpandSuperTypes(false);
builder.useParallelExecutor();
Reflections reflections = new InternalReflections(builder);
return new PluginScanResult( return new PluginScanResult(
getPluginDesc(reflections, Connector.class, loader), getPluginDesc(reflections, Connector.class, loader),
@ -353,4 +359,25 @@ public class DelegatingClassLoader extends URLClassLoader {
} }
} }
} }
private static class InternalReflections extends Reflections {
public InternalReflections(Configuration configuration) {
super(configuration);
}
// When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException
// as RuntimeException. Override the scan behavior to emulate the singled-threaded logic.
@Override
protected void scan(URL url) {
try {
super.scan(url);
} catch (ReflectionsException e) {
Logger log = Reflections.log;
if (log != null && log.isWarnEnabled()) {
log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e);
}
}
}
}
} }

Loading…
Cancel
Save