Browse Source

KAFKA-15228: Add sync-manifests command to connect-plugin-path (KIP-898) (#14195)

Reviewers: Chris Egerton <chrise@aiven.io>
pull/13913/merge
Greg Harris 1 year ago committed by GitHub
parent
commit
6bd17419b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java
  2. 18
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
  3. 258
      tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
  4. 603
      tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java
  5. 219
      tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java

12
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java

@ -23,12 +23,18 @@ import java.util.Objects; @@ -23,12 +23,18 @@ import java.util.Objects;
public class PluginSource {
public enum Type {
CLASSPATH, MULTI_JAR, SINGLE_JAR, CLASS_HIERARCHY
}
private final Path location;
private final Type type;
private final ClassLoader loader;
private final URL[] urls;
public PluginSource(Path location, ClassLoader loader, URL[] urls) {
public PluginSource(Path location, Type type, ClassLoader loader, URL[] urls) {
this.location = location;
this.type = type;
this.loader = loader;
this.urls = urls;
}
@ -37,6 +43,10 @@ public class PluginSource { @@ -37,6 +43,10 @@ public class PluginSource {
return location;
}
public Type type() {
return type;
}
public ClassLoader loader() {
return loader;
}

18
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java

@ -356,7 +356,19 @@ public class PluginUtils { @@ -356,7 +356,19 @@ public class PluginUtils {
public static PluginSource isolatedPluginSource(Path pluginLocation, ClassLoader parent, PluginClassLoaderFactory factory) throws IOException {
List<URL> pluginUrls = new ArrayList<>();
for (Path path : pluginUrls(pluginLocation)) {
List<Path> paths = pluginUrls(pluginLocation);
// Infer the type of the source
PluginSource.Type type;
if (paths.size() == 1 && paths.get(0) == pluginLocation) {
if (PluginUtils.isArchive(pluginLocation)) {
type = PluginSource.Type.SINGLE_JAR;
} else {
type = PluginSource.Type.CLASS_HIERARCHY;
}
} else {
type = PluginSource.Type.MULTI_JAR;
}
for (Path path : paths) {
pluginUrls.add(path.toUri().toURL());
}
URL[] urls = pluginUrls.toArray(new URL[0]);
@ -365,14 +377,14 @@ public class PluginUtils { @@ -365,14 +377,14 @@ public class PluginUtils {
urls,
parent
);
return new PluginSource(pluginLocation, loader, urls);
return new PluginSource(pluginLocation, type, loader, urls);
}
public static PluginSource classpathPluginSource(ClassLoader classLoader) {
List<URL> parentUrls = new ArrayList<>();
parentUrls.addAll(ClasspathHelper.forJavaClassPath());
parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader));
return new PluginSource(null, classLoader, parentUrls.toArray(new URL[0]));
return new PluginSource(null, PluginSource.Type.CLASSPATH, classLoader, parentUrls.toArray(new URL[0]));
}
/**

258
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java

@ -35,23 +35,16 @@ import org.apache.kafka.connect.runtime.isolation.PluginUtils; @@ -35,23 +35,16 @@ import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@ -64,8 +57,6 @@ import java.util.stream.Collectors; @@ -64,8 +57,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ConnectPluginPath {
private static final String MANIFEST_PREFIX = "META-INF/services/";
public static final Object[] LIST_TABLE_COLUMNS = {
"pluginName",
"firstAlias",
@ -86,7 +77,7 @@ public class ConnectPluginPath { @@ -86,7 +77,7 @@ public class ConnectPluginPath {
ArgumentParser parser = parser();
try {
Namespace namespace = parser.parseArgs(args);
Config config = parseConfig(parser, namespace, out);
Config config = parseConfig(parser, namespace, out, err);
runCommand(config);
return 0;
} catch (ArgumentParserException e) {
@ -96,8 +87,8 @@ public class ConnectPluginPath { @@ -96,8 +87,8 @@ public class ConnectPluginPath {
err.println(e.getMessage());
return 2;
} catch (Throwable e) {
err.println(e.getMessage());
err.println(Utils.stackTrace(e));
err.println(e.getMessage());
return 3;
}
}
@ -112,8 +103,14 @@ public class ConnectPluginPath { @@ -112,8 +103,14 @@ public class ConnectPluginPath {
.dest("subcommand")
.addParser("list");
ArgumentParser syncManifestsCommand = parser.addSubparsers()
.description("Mutate the specified plugins to be compatible with plugin.discovery=SERVICE_LOAD mode")
.dest("subcommand")
.addParser("sync-manifests");
ArgumentParser[] subparsers = new ArgumentParser[] {
listCommand,
syncManifestsCommand
};
for (ArgumentParser subparser : subparsers) {
@ -134,10 +131,18 @@ public class ConnectPluginPath { @@ -134,10 +131,18 @@ public class ConnectPluginPath {
.help("A Connect worker configuration file");
}
syncManifestsCommand.addArgument("--dry-run")
.action(Arguments.storeTrue())
.help("If specified, changes that would have been written to disk are not applied");
syncManifestsCommand.addArgument("--keep-not-found")
.action(Arguments.storeTrue())
.help("If specified, manifests for missing plugins are not removed from the plugin path");
return parser;
}
private static Config parseConfig(ArgumentParser parser, Namespace namespace, PrintStream out) throws ArgumentParserException, TerseException {
private static Config parseConfig(ArgumentParser parser, Namespace namespace, PrintStream out, PrintStream err) throws ArgumentParserException, TerseException {
Set<Path> locations = parseLocations(parser, namespace);
String subcommand = namespace.getString("subcommand");
if (subcommand == null) {
@ -145,7 +150,9 @@ public class ConnectPluginPath { @@ -145,7 +150,9 @@ public class ConnectPluginPath {
}
switch (subcommand) {
case "list":
return new Config(Command.LIST, locations, out);
return new Config(Command.LIST, locations, false, false, out, err);
case "sync-manifests":
return new Config(Command.SYNC_MANIFESTS, locations, namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, err);
default:
throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser);
}
@ -189,18 +196,24 @@ public class ConnectPluginPath { @@ -189,18 +196,24 @@ public class ConnectPluginPath {
}
enum Command {
LIST
LIST, SYNC_MANIFESTS;
}
private static class Config {
private final Command command;
private final Set<Path> locations;
private final boolean dryRun;
private final boolean keepNotFound;
private final PrintStream out;
private final PrintStream err;
private Config(Command command, Set<Path> locations, PrintStream out) {
private Config(Command command, Set<Path> locations, boolean dryRun, boolean keepNotFound, PrintStream out, PrintStream err) {
this.command = command;
this.locations = locations;
this.dryRun = dryRun;
this.keepNotFound = keepNotFound;
this.out = out;
this.err = err;
}
@Override
@ -208,21 +221,23 @@ public class ConnectPluginPath { @@ -208,21 +221,23 @@ public class ConnectPluginPath {
return "Config{" +
"command=" + command +
", locations=" + locations +
", dryRun=" + dryRun +
", keepNotFound=" + keepNotFound +
'}';
}
}
public static void runCommand(Config config) throws TerseException {
try {
ManifestWorkspace workspace = new ManifestWorkspace(config.out);
ClassLoader parent = ConnectPluginPath.class.getClassLoader();
ServiceLoaderScanner serviceLoaderScanner = new ServiceLoaderScanner();
ReflectionScanner reflectionScanner = new ReflectionScanner();
// Process the contents of the classpath to exclude it from later results.
PluginSource classpathSource = PluginUtils.classpathPluginSource(parent);
Map<String, List<ManifestEntry>> classpathManifests = findManifests(classpathSource, Collections.emptyMap());
ManifestWorkspace.SourceWorkspace<?> classpathWorkspace = workspace.forSource(classpathSource);
PluginScanResult classpathPlugins = discoverPlugins(classpathSource, reflectionScanner, serviceLoaderScanner);
Map<Path, Set<Row>> rowsByLocation = new LinkedHashMap<>();
Set<Row> classpathRows = enumerateRows(null, classpathManifests, classpathPlugins);
Set<Row> classpathRows = enumerateRows(classpathWorkspace, classpathPlugins);
rowsByLocation.put(null, classpathRows);
ClassLoaderFactory factory = new ClassLoaderFactory();
@ -230,18 +245,18 @@ public class ConnectPluginPath { @@ -230,18 +245,18 @@ public class ConnectPluginPath {
beginCommand(config);
for (Path pluginLocation : config.locations) {
PluginSource source = PluginUtils.isolatedPluginSource(pluginLocation, delegatingClassLoader, factory);
Map<String, List<ManifestEntry>> manifests = findManifests(source, classpathManifests);
ManifestWorkspace.SourceWorkspace<?> pluginWorkspace = workspace.forSource(source);
PluginScanResult plugins = discoverPlugins(source, reflectionScanner, serviceLoaderScanner);
Set<Row> rows = enumerateRows(pluginLocation, manifests, plugins);
Set<Row> rows = enumerateRows(pluginWorkspace, plugins);
rowsByLocation.put(pluginLocation, rows);
for (Row row : rows) {
handlePlugin(config, row);
}
}
endCommand(config, rowsByLocation);
endCommand(config, workspace, rowsByLocation);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (Throwable e) {
failCommand(config, e);
}
}
@ -251,7 +266,7 @@ public class ConnectPluginPath { @@ -251,7 +266,7 @@ public class ConnectPluginPath {
* that pertains to this specific plugin.
*/
private static class Row {
private final Path pluginLocation;
private final ManifestWorkspace.SourceWorkspace<?> workspace;
private final String className;
private final PluginType type;
private final String version;
@ -259,8 +274,8 @@ public class ConnectPluginPath { @@ -259,8 +274,8 @@ public class ConnectPluginPath {
private final boolean loadable;
private final boolean hasManifest;
public Row(Path pluginLocation, String className, PluginType type, String version, List<String> aliases, boolean loadable, boolean hasManifest) {
this.pluginLocation = pluginLocation;
public Row(ManifestWorkspace.SourceWorkspace<?> workspace, String className, PluginType type, String version, List<String> aliases, boolean loadable, boolean hasManifest) {
this.workspace = Objects.requireNonNull(workspace, "workspace must be non-null");
this.className = Objects.requireNonNull(className, "className must be non-null");
this.version = Objects.requireNonNull(version, "version must be non-null");
this.type = Objects.requireNonNull(type, "type must be non-null");
@ -277,11 +292,8 @@ public class ConnectPluginPath { @@ -277,11 +292,8 @@ public class ConnectPluginPath {
return loadable && hasManifest;
}
private boolean incompatible() {
return !compatible();
}
private String locationString() {
Path pluginLocation = workspace.location();
return pluginLocation == null ? "classpath" : pluginLocation.toString();
}
@ -290,40 +302,41 @@ public class ConnectPluginPath { @@ -290,40 +302,41 @@ public class ConnectPluginPath {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Row row = (Row) o;
return Objects.equals(pluginLocation, row.pluginLocation) && className.equals(row.className) && type == row.type;
return Objects.equals(workspace, row.workspace) && className.equals(row.className) && type == row.type;
}
@Override
public int hashCode() {
return Objects.hash(pluginLocation, className, type);
return Objects.hash(workspace, className, type);
}
}
private static Set<Row> enumerateRows(Path pluginLocation, Map<String, List<ManifestEntry>> manifests, PluginScanResult scanResult) {
private static Set<Row> enumerateRows(ManifestWorkspace.SourceWorkspace<?> workspace, PluginScanResult scanResult) {
Set<Row> rows = new HashSet<>();
// Perform a deep copy of the manifests because we're going to be mutating our copy.
Map<String, Set<ManifestEntry>> unloadablePlugins = manifests.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue())));
Map<String, Set<PluginType>> nonLoadableManifests = new HashMap<>();
workspace.forEach((className, type) -> {
// Mark all manifests in the workspace as non-loadable first
nonLoadableManifests.computeIfAbsent(className, ignored -> EnumSet.of(type)).add(type);
});
scanResult.forEach(pluginDesc -> {
// Emit a loadable row for this scan result, since it was found during plugin discovery
// Only loadable plugins appear in the scan result
Set<String> rowAliases = new LinkedHashSet<>();
rowAliases.add(PluginUtils.simpleName(pluginDesc));
rowAliases.add(PluginUtils.prunedName(pluginDesc));
rows.add(newRow(pluginLocation, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true, manifests));
// Remove the ManifestEntry if it has the same className and type as one of the loadable plugins.
unloadablePlugins.getOrDefault(pluginDesc.className(), Collections.emptySet()).removeIf(entry -> entry.type == pluginDesc.type());
rows.add(newRow(workspace, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true));
// If a corresponding manifest exists, mark it as loadable by removing it from the map.
nonLoadableManifests.getOrDefault(pluginDesc.className(), Collections.emptySet()).remove(pluginDesc.type());
});
unloadablePlugins.values().forEach(entries -> entries.forEach(entry -> {
// Emit a non-loadable row, since all the loadable rows showed up in the previous iteration.
// Two ManifestEntries may produce the same row if they have different URIs
rows.add(newRow(pluginLocation, entry.className, Collections.emptyList(), entry.type, PluginDesc.UNDEFINED_VERSION, false, manifests));
nonLoadableManifests.forEach((className, types) -> types.forEach(type -> {
// All manifests which remain in the map are not loadable
rows.add(newRow(workspace, className, Collections.emptyList(), type, PluginDesc.UNDEFINED_VERSION, false));
}));
return rows;
}
private static Row newRow(Path pluginLocation, String className, List<String> rowAliases, PluginType type, String version, boolean loadable, Map<String, List<ManifestEntry>> manifests) {
boolean hasManifest = manifests.containsKey(className) && manifests.get(className).stream().anyMatch(e -> e.type == type);
return new Row(pluginLocation, className, type, version, rowAliases, loadable, hasManifest);
private static Row newRow(ManifestWorkspace.SourceWorkspace<?> workspace, String className, List<String> rowAliases, PluginType type, String version, boolean loadable) {
boolean hasManifest = workspace.hasManifest(type, className);
return new Row(workspace, className, type, version, rowAliases, loadable, hasManifest);
}
private static void beginCommand(Config config) {
@ -332,6 +345,11 @@ public class ConnectPluginPath { @@ -332,6 +345,11 @@ public class ConnectPluginPath {
// This is officially human-readable output with no guarantees for backwards-compatibility
// It should be reasonably easy to parse for ad-hoc scripting use-cases.
listTablePrint(config, LIST_TABLE_COLUMNS);
} else if (config.command == Command.SYNC_MANIFESTS) {
if (config.dryRun) {
config.out.println("Dry run started: No changes will be committed.");
}
config.out.println("Scanning for plugins...");
}
}
@ -350,13 +368,20 @@ public class ConnectPluginPath { @@ -350,13 +368,20 @@ public class ConnectPluginPath {
// last because it is least important and most repetitive
row.locationString()
);
} else if (config.command == Command.SYNC_MANIFESTS) {
if (row.loadable && !row.hasManifest) {
row.workspace.addManifest(row.type, row.className);
} else if (!row.loadable && row.hasManifest && !config.keepNotFound) {
row.workspace.removeManifest(row.type, row.className);
}
}
}
private static void endCommand(
Config config,
ManifestWorkspace workspace,
Map<Path, Set<Row>> rowsByLocation
) {
) throws IOException, TerseException {
if (config.command == Command.LIST) {
// end the table with an empty line to enable users to separate the table from the summary.
config.out.println();
@ -368,6 +393,35 @@ public class ConnectPluginPath { @@ -368,6 +393,35 @@ public class ConnectPluginPath {
config.out.printf("Total plugins: \t%d%n", totalPlugins);
config.out.printf("Loadable plugins: \t%d%n", loadablePlugins);
config.out.printf("Compatible plugins: \t%d%n", compatiblePlugins);
} else if (config.command == Command.SYNC_MANIFESTS) {
if (workspace.commit(true)) {
if (config.dryRun) {
config.out.println("Dry run passed: All above changes can be committed to disk if re-run with dry run disabled.");
} else {
config.out.println("Writing changes to plugins...");
try {
workspace.commit(false);
} catch (Throwable t) {
config.err.println(Utils.stackTrace(t));
throw new TerseException("Sync incomplete due to exception; plugin path may be corrupted. Discard the contents of the plugin.path before retrying.");
}
config.out.println("All loadable plugins have accurate ServiceLoader manifests.");
}
} else {
config.out.println("No changes required.");
}
}
}
private static void failCommand(Config config, Throwable e) throws TerseException {
if (e instanceof TerseException) {
throw (TerseException) e;
}
if (config.command == Command.LIST) {
throw new RuntimeException("Unexpected error occurred while listing plugins", e);
} else if (config.command == Command.SYNC_MANIFESTS) {
// The real write errors are propagated as a TerseException, and don't take this branch.
throw new RuntimeException("Unexpected error occurred while dry-running sync", e);
}
}
@ -385,108 +439,4 @@ public class ConnectPluginPath { @@ -385,108 +439,4 @@ public class ConnectPluginPath {
PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Collections.singleton(source));
return new PluginScanResult(Arrays.asList(serviceLoadResult, reflectiveResult));
}
private static class ManifestEntry {
private final URI manifestURI;
private final String className;
private final PluginType type;
private ManifestEntry(URI manifestURI, String className, PluginType type) {
this.manifestURI = manifestURI;
this.className = className;
this.type = type;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ManifestEntry that = (ManifestEntry) o;
return manifestURI.equals(that.manifestURI) && className.equals(that.className) && type == that.type;
}
@Override
public int hashCode() {
return Objects.hash(manifestURI, className, type);
}
}
private static Map<String, List<ManifestEntry>> findManifests(PluginSource source, Map<String, List<ManifestEntry>> exclude) {
Map<String, List<ManifestEntry>> manifests = new LinkedHashMap<>();
for (PluginType type : PluginType.values()) {
try {
Enumeration<URL> resources = source.loader().getResources(MANIFEST_PREFIX + type.superClass().getName());
while (resources.hasMoreElements()) {
URL url = resources.nextElement();
for (String className : parse(url)) {
ManifestEntry e = new ManifestEntry(url.toURI(), className, type);
manifests.computeIfAbsent(className, ignored -> new ArrayList<>()).add(e);
}
}
} catch (URISyntaxException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
for (Map.Entry<String, List<ManifestEntry>> entry : exclude.entrySet()) {
String className = entry.getKey();
List<ManifestEntry> excluded = entry.getValue();
// Note this must be a remove and not removeAll, because we want to remove only one copy at a time.
// If the same jar is present on the classpath and plugin path, then manifests will contain 2 identical
// ManifestEntry instances, with a third copy in the excludes. After the excludes are processed,
// manifests should contain exactly one copy of the ManifestEntry.
for (ManifestEntry e : excluded) {
manifests.getOrDefault(className, Collections.emptyList()).remove(e);
}
}
return manifests;
}
// Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11
private static Set<String> parse(URL u) {
Set<String> names = new LinkedHashSet<>(); // preserve insertion order
try {
URLConnection uc = u.openConnection();
uc.setUseCaches(false);
try (InputStream in = uc.getInputStream();
BufferedReader r
= new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
int lc = 1;
while ((lc = parseLine(u, r, lc, names)) >= 0) {
// pass
}
}
} catch (IOException x) {
throw new RuntimeException("Error accessing configuration file", x);
}
return names;
}
// Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11
private static int parseLine(URL u, BufferedReader r, int lc, Set<String> names) throws IOException {
String ln = r.readLine();
if (ln == null) {
return -1;
}
int ci = ln.indexOf('#');
if (ci >= 0) ln = ln.substring(0, ci);
ln = ln.trim();
int n = ln.length();
if (n != 0) {
if ((ln.indexOf(' ') >= 0) || (ln.indexOf('\t') >= 0))
throw new IOException("Illegal configuration-file syntax in " + u);
int cp = ln.codePointAt(0);
if (!Character.isJavaIdentifierStart(cp))
throw new IOException("Illegal provider-class name: " + ln + " in " + u);
int start = Character.charCount(cp);
for (int i = start; i < n; i += Character.charCount(cp)) {
cp = ln.codePointAt(i);
if (!Character.isJavaIdentifierPart(cp) && (cp != '.'))
throw new IOException("Illegal provider-class name: " + ln + " in " + u);
}
names.add(ln);
}
return lc + 1;
}
}

603
tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java

@ -0,0 +1,603 @@ @@ -0,0 +1,603 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.runtime.isolation.PluginSource;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
* An in-memory workspace for manipulating {@link java.util.ServiceLoader} manifest files.
* <p>Use {@link #forSource(PluginSource)} to get a workspace scoped to a single plugin location, which is able
* to accept simulated reads and writes of manifests.
* Write the simulated changes to disk via {@link #commit(boolean)}.
*/
public class ManifestWorkspace {
private static final Logger log = LoggerFactory.getLogger(ManifestWorkspace.class);
private static final String MANIFEST_PREFIX = "META-INF/services/";
private static final Path MANAGED_PATH = Paths.get("connect-plugin-path-shim-1.0.0.jar");
private static final String MANIFEST_HEADER = "# Generated by connect-plugin-path.sh " + AppInfoParser.getVersion();
private final PrintStream out;
private final List<SourceWorkspace<?>> workspaces;
private final Map<Path, Path> temporaryOverlayFiles;
public ManifestWorkspace(PrintStream out) {
this.out = out;
workspaces = new ArrayList<>();
temporaryOverlayFiles = new HashMap<>();
}
public SourceWorkspace<?> forSource(PluginSource source) throws IOException {
SourceWorkspace<?> sourceWorkspace;
switch (source.type()) {
case CLASSPATH:
sourceWorkspace = new ClasspathWorkspace(source);
break;
case MULTI_JAR:
sourceWorkspace = new MultiJarWorkspace(source);
break;
case SINGLE_JAR:
sourceWorkspace = new SingleJarWorkspace(source);
break;
case CLASS_HIERARCHY:
sourceWorkspace = new ClassHierarchyWorkspace(source);
break;
default:
throw new IllegalStateException("Unknown source type " + source.type());
}
workspaces.add(sourceWorkspace);
return sourceWorkspace;
}
/**
* Commits all queued changes to disk
* @return true if any workspace wrote changes to disk, false if all workspaces did not have writes to apply
* @throws IOException if an error occurs reading or writing to the filesystem
* @throws TerseException if a path is not writable on disk and should be.
*/
public boolean commit(boolean dryRun) throws IOException, TerseException {
boolean changed = false;
for (SourceWorkspace<?> workspace : workspaces) {
changed |= workspace.commit(dryRun);
}
return changed;
}
/**
* A workspace scoped to a single plugin source.
* <p>Buffers simulated reads and writes to the plugin path before they can be written to disk.
* @param <T> The data structure used by the workspace to store in-memory manifests internally.
*/
public static abstract class SourceWorkspace<T> {
private final Path location;
private final PluginSource.Type type;
protected final T initial;
protected final T manifests;
private SourceWorkspace(PluginSource source) throws IOException {
this.location = source.location();
this.type = source.type();
this.initial = load(source);
this.manifests = load(source);
}
public Path location() {
return location;
}
public PluginSource.Type type() {
return type;
}
protected abstract T load(PluginSource source) throws IOException;
public abstract boolean hasManifest(PluginType type, String className);
public abstract void forEach(BiConsumer<String, PluginType> consumer);
public abstract void addManifest(PluginType type, String pluginClass);
public abstract void removeManifest(PluginType type, String pluginClass);
protected abstract boolean commit(boolean dryRun) throws TerseException, IOException;
protected static Map<PluginType, Set<String>> loadManifest(URL baseUrl) throws MalformedURLException {
Map<PluginType, Set<String>> manifests = new EnumMap<>(PluginType.class);
for (PluginType type : PluginType.values()) {
Set<String> result;
try {
URL u = new URL(baseUrl, MANIFEST_PREFIX + type.superClass().getName());
result = parse(u);
} catch (RuntimeException e) {
result = new LinkedHashSet<>();
}
manifests.put(type, result);
}
return manifests;
}
protected static URL jarBaseUrl(URL fileUrl) throws MalformedURLException {
return new URL("jar", "", -1, fileUrl + "!/", null);
}
protected static void forEach(Map<PluginType, Set<String>> manifests, BiConsumer<String, PluginType> consumer) {
manifests.forEach((type, classNames) -> classNames.forEach(className -> consumer.accept(className, type)));
}
}
/**
* A single jar can only contain one manifest per plugin type.
*/
private class SingleJarWorkspace extends SourceWorkspace<Map<PluginType, Set<String>>> {
private SingleJarWorkspace(PluginSource source) throws IOException {
super(source);
assert source.urls().length == 1;
}
@Override
protected Map<PluginType, Set<String>> load(PluginSource source) throws IOException {
return loadManifest(jarBaseUrl(source.urls()[0]));
}
@Override
public boolean hasManifest(PluginType type, String className) {
return manifests.get(type).contains(className);
}
@Override
public void forEach(BiConsumer<String, PluginType> consumer) {
forEach(manifests, consumer);
}
@Override
public void addManifest(PluginType type, String pluginClass) {
manifests.get(type).add(pluginClass);
}
@Override
public void removeManifest(PluginType type, String pluginClass) {
manifests.get(type).remove(pluginClass);
}
@Override
protected boolean commit(boolean dryRun) throws IOException, TerseException {
if (startSync(dryRun, location(), initial, manifests)) {
rewriteJar(dryRun, location(), manifests);
return true;
}
return false;
}
}
/**
* A classpath workspace is backed by multiple jars, and is not writable.
* The in-memory format is a map from jar path to the manifests contained in that jar.
* The control flow of the caller should not perform writes, so these exceptions indicate a bug in the program.
*/
private class ClasspathWorkspace extends SourceWorkspace<Map<Path, Map<PluginType, Set<String>>>> {
private ClasspathWorkspace(PluginSource source) throws IOException {
super(source);
}
@Override
protected Map<Path, Map<PluginType, Set<String>>> load(PluginSource source) throws IOException {
Map<Path, Map<PluginType, Set<String>>> manifestsBySubLocation = new HashMap<>();
for (URL url : source.urls()) {
Path jarPath = Paths.get(url.getPath());
manifestsBySubLocation.put(jarPath, loadManifest(jarBaseUrl(url)));
}
return manifestsBySubLocation;
}
public boolean hasManifest(PluginType type, String className) {
return manifests.values()
.stream()
.map(m -> m.get(type))
.anyMatch(s -> s.contains(className));
}
public void forEach(BiConsumer<String, PluginType> consumer) {
manifests.values().forEach(m -> forEach(m, consumer));
}
@Override
public void addManifest(PluginType type, String pluginClass) {
throw new UnsupportedOperationException("Cannot change the contents of the classpath");
}
@Override
public void removeManifest(PluginType type, String pluginClass) {
throw new UnsupportedOperationException("Cannot change the contents of the classpath");
}
@Override
protected boolean commit(boolean dryRun) throws IOException, TerseException {
// There is never anything to commit for the classpath
return false;
}
}
/**
* A multi-jar workspace is similar to the classpath workspace because it has multiple jars.
* However, the multi-jar workspace is writable, and injects a managed jar where it writes added manifests.
*/
private class MultiJarWorkspace extends ClasspathWorkspace {
private MultiJarWorkspace(PluginSource source) throws IOException {
super(source);
}
@Override
protected Map<Path, Map<PluginType, Set<String>>> load(PluginSource source) throws IOException {
Map<Path, Map<PluginType, Set<String>>> manifests = super.load(source);
// In addition to the normal multi-jar paths, inject a managed jar where we can add manifests.
Path managedPath = source.location().resolve(MANAGED_PATH);
URL url = managedPath.toUri().toURL();
manifests.put(managedPath, loadManifest(jarBaseUrl(url)));
return manifests;
}
@Override
public void addManifest(PluginType type, String pluginClass) {
// Add plugins to the managed manifest
manifests.get(location().resolve(MANAGED_PATH)).get(type).add(pluginClass);
}
@Override
public void removeManifest(PluginType type, String pluginClass) {
// If a plugin appears in multiple manifests, remove it from all of them.
for (Map<PluginType, Set<String>> manifestState : manifests.values()) {
manifestState.get(type).remove(pluginClass);
}
}
@Override
public boolean commit(boolean dryRun) throws IOException, TerseException {
boolean changed = false;
for (Map.Entry<Path, Map<PluginType, Set<String>>> manifestSource : manifests.entrySet()) {
Path jarPath = manifestSource.getKey();
Map<PluginType, Set<String>> before = initial.get(jarPath);
Map<PluginType, Set<String>> after = manifestSource.getValue();
if (startSync(dryRun, jarPath, before, after)) {
rewriteJar(dryRun, jarPath, after);
changed = true;
}
}
return changed;
}
}
/**
* The class hierarchy is similar to the single-jar because there can only be one manifest per type.
* However, the path to that single manifest is accessed via the pluginLocation.
*/
private class ClassHierarchyWorkspace extends SingleJarWorkspace {
private ClassHierarchyWorkspace(PluginSource source) throws IOException {
super(source);
}
@Override
protected Map<PluginType, Set<String>> load(PluginSource source) throws IOException {
return loadManifest(source.location().toUri().toURL());
}
protected boolean commit(boolean dryRun) throws IOException, TerseException {
if (startSync(dryRun, location(), initial, manifests)) {
rewriteClassHierarchyManifest(dryRun, location(), manifests);
return true;
}
return false;
}
}
private boolean startSync(boolean dryRun, Path syncLocation, Map<PluginType, Set<String>> before, Map<PluginType, Set<String>> after) {
Objects.requireNonNull(syncLocation, "syncLocation must be non-null");
Objects.requireNonNull(before, "before must be non-null");
Objects.requireNonNull(after, "after must be non-null");
if (before.equals(after)) {
return false;
}
Set<String> added = new HashSet<>();
after.values().forEach(added::addAll);
before.values().forEach(added::removeAll);
Set<String> removed = new HashSet<>();
before.values().forEach(removed::addAll);
after.values().forEach(removed::removeAll);
out.printf("%sSync\t%s Add %s Remove %s%n", dryRun ? "Dry Run " : "", syncLocation, added.size(), removed.size());
for (String add : added) {
out.printf("\tAdd\t%s%n", add);
}
for (String rem : removed) {
out.printf("\tRemove\t%s%n", rem);
}
return true;
}
/**
* Rewrite a jar on disk to have manifests with the specified entries.
* Will create the jar file if it does not exist and at least one manifest element is specified.
* Will delete the jar file if it exists, and would otherwise remain empty.
*
* @param dryRun True if the rewrite should be applied, false if it should be simulated.
* @param jarPath Path to a jar file for a plugin
* @param manifestElements Map from plugin type to Class names of plugins which should appear in that manifest
*/
private void rewriteJar(boolean dryRun, Path jarPath, Map<PluginType, Set<String>> manifestElements) throws IOException, TerseException {
Objects.requireNonNull(jarPath, "jarPath must be non-null");
Objects.requireNonNull(manifestElements, "manifestState must be non-null");
Path writableJar = getWritablePath(dryRun, jarPath);
if (nonEmpty(manifestElements) && !Files.exists(writableJar)) {
log.debug("Create {}", jarPath);
createJar(writableJar);
}
try (FileSystem jar = FileSystems.newFileSystem(
new URI("jar", writableJar.toUri().toString(), ""),
Collections.emptyMap()
)) {
Path zipRoot = jar.getRootDirectories().iterator().next();
// Set dryRun to false because this jar file is already a writable copy.
rewriteClassHierarchyManifest(false, zipRoot, manifestElements);
} catch (URISyntaxException e) {
throw new IOException(e);
}
if (Files.exists(writableJar) && jarIsEmpty(writableJar)) {
Files.delete(writableJar);
}
}
private static boolean nonEmpty(Map<PluginType, Set<String>> manifestElements) {
return !manifestElements.values().stream().allMatch(Collection::isEmpty);
}
private void createJar(Path path) throws IOException {
Objects.requireNonNull(path, "path must be non-null");
try (ZipOutputStream stream = new ZipOutputStream(Files.newOutputStream(
path,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING
))) {
stream.closeEntry();
}
}
private boolean jarIsEmpty(Path path) throws IOException {
Objects.requireNonNull(path, "path must be non-null");
try (ZipInputStream stream = new ZipInputStream(Files.newInputStream(
path,
StandardOpenOption.READ
))) {
return stream.getNextEntry() == null;
}
}
/**
* Rewrite multiple manifest files contained inside a class hierarchy.
* Will create the files and parent directories if they do not exist and at least one element is specified.
* Will delete the files and parent directories if they exist and would otherwise remain empty.
*
* @param dryRun True if the rewrite should be applied, false if it should be simulated.
* @param pluginLocation Path to top-level of the class hierarchy
* @param manifestElements Map from plugin type to Class names of plugins which should appear in that manifest
*/
private void rewriteClassHierarchyManifest(boolean dryRun, Path pluginLocation, Map<PluginType, Set<String>> manifestElements) throws IOException, TerseException {
Objects.requireNonNull(pluginLocation, "pluginLocation must be non-null");
Objects.requireNonNull(manifestElements, "manifestState must be non-null");
if (!Files.exists(pluginLocation)) {
throw new TerseException(pluginLocation + " does not exist");
}
if (!Files.isWritable(pluginLocation)) {
throw new TerseException(pluginLocation + " is not writable");
}
Path metaInfPath = pluginLocation.resolve("META-INF");
Path servicesPath = metaInfPath.resolve("services");
if (nonEmpty(manifestElements) && !Files.exists(servicesPath) && !dryRun) {
Files.createDirectories(servicesPath);
}
for (Map.Entry<PluginType, Set<String>> manifest : manifestElements.entrySet()) {
PluginType type = manifest.getKey();
Set<String> elements = manifest.getValue();
rewriteManifestFile(dryRun, servicesPath.resolve(type.superClass().getName()), elements);
}
deleteDirectoryIfEmpty(dryRun, servicesPath);
deleteDirectoryIfEmpty(dryRun, metaInfPath);
}
private void deleteDirectoryIfEmpty(boolean dryRun, Path path) throws IOException, TerseException {
if (!Files.exists(path)) {
return;
}
if (!Files.isWritable(path)) {
throw new TerseException(path + " is not writable");
}
try (Stream<Path> list = Files.list(path)) {
if (list.findAny().isPresent()) {
return;
}
}
log.debug("Delete {}", path);
if (!dryRun) {
Files.delete(path);
}
}
/**
* Rewrite a single manifest file.
* Will create the file if it does not exist and at least one element is specified.
* Will delete the file if it exists and no elements are specified.
*
* @param dryRun True if the rewrite should be applied, false if it should be simulated.
* @param filePath Path to file which should be rewritten.
* @param elements Class names of plugins which should appear in the manifest
*/
private void rewriteManifestFile(boolean dryRun, Path filePath, Set<String> elements) throws IOException, TerseException {
Objects.requireNonNull(filePath, "filePath must be non-null");
Objects.requireNonNull(elements, "elements must be non-null");
Path writableFile = getWritablePath(dryRun, filePath);
if (elements.isEmpty()) {
if (Files.exists(filePath)) {
log.debug("Delete {}", filePath);
if (!dryRun) {
Files.delete(writableFile);
}
}
} else {
if (!Files.exists(filePath)) {
log.debug("Create {}", filePath);
}
log.debug("Write {} with content {}", filePath, elements);
if (!dryRun) {
try (OutputStream stream = new BufferedOutputStream(Files.newOutputStream(
writableFile,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING
))) {
byte[] newline = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
stream.write(MANIFEST_HEADER.getBytes(StandardCharsets.UTF_8));
stream.write(newline);
for (String element : elements) {
stream.write(element.getBytes(StandardCharsets.UTF_8));
stream.write(newline);
}
}
}
}
}
/**
* Get a path which is always writable
* @param dryRun If true, substitute a temporary file instead of the real file on disk.
* @param path Path which must be writable
* @return Path which is writable, and may be different from the input path
*/
private Path getWritablePath(boolean dryRun, Path path) throws IOException, TerseException {
Objects.requireNonNull(path, "path must be non-null");
for (Path parent = path; parent != null && !Files.isWritable(parent); parent = parent.getParent()) {
if (Files.exists(parent) && !Files.isWritable(parent)) {
throw new TerseException("Path " + path + " must be writable");
}
}
if (dryRun) {
if (!temporaryOverlayFiles.containsKey(path)) {
Path fileName = path.getFileName();
String suffix = fileName != null ? fileName.toString() : ".temp";
Path temp = Files.createTempFile("connect-plugin-path-temporary-", suffix);
if (Files.exists(path)) {
Files.copy(path, temp, StandardCopyOption.REPLACE_EXISTING);
temp.toFile().deleteOnExit();
} else {
Files.delete(temp);
}
temporaryOverlayFiles.put(path, temp);
return temp;
}
return temporaryOverlayFiles.get(path);
}
return path;
}
// Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11
private static Set<String> parse(URL u) {
Set<String> names = new LinkedHashSet<>(); // preserve insertion order
try {
URLConnection uc = u.openConnection();
uc.setUseCaches(false);
try (InputStream in = uc.getInputStream();
BufferedReader r
= new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
int lc = 1;
while ((lc = parseLine(u, r, lc, names)) >= 0) {
// pass
}
}
} catch (IOException x) {
throw new RuntimeException("Error accessing configuration file", x);
}
return names;
}
// Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11
private static int parseLine(URL u, BufferedReader r, int lc, Set<String> names) throws IOException {
String ln = r.readLine();
if (ln == null) {
return -1;
}
int ci = ln.indexOf('#');
if (ci >= 0) ln = ln.substring(0, ci);
ln = ln.trim();
int n = ln.length();
if (n != 0) {
if ((ln.indexOf(' ') >= 0) || (ln.indexOf('\t') >= 0))
throw new IOException("Illegal configuration-file syntax in " + u);
int cp = ln.codePointAt(0);
if (!Character.isJavaIdentifierStart(cp))
throw new IOException("Illegal provider-class name: " + ln + " in " + u);
int start = Character.charCount(cp);
for (int i = start; i < n; i += Character.charCount(cp)) {
cp = ln.codePointAt(i);
if (!Character.isJavaIdentifierPart(cp) && (cp != '.'))
throw new IOException("Illegal provider-class name: " + ln + " in " + u);
}
names.add(ln);
}
return lc + 1;
}
}

219
tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java

@ -20,6 +20,7 @@ import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; @@ -20,6 +20,7 @@ import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginScanResult;
import org.apache.kafka.connect.runtime.isolation.PluginSource;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
@ -52,14 +53,17 @@ import java.util.Map; @@ -52,14 +53,17 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ConnectPluginPathTest {
@ -107,7 +111,7 @@ public class ConnectPluginPathTest { @@ -107,7 +111,7 @@ public class ConnectPluginPathTest {
setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN)
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertNonMigratedPluginsPresent(table);
assertNonMigratedPluginsStatus(table, false);
}
@ParameterizedTest
@ -121,7 +125,7 @@ public class ConnectPluginPathTest { @@ -121,7 +125,7 @@ public class ConnectPluginPathTest {
setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE)
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertNonMigratedPluginsPresent(table);
assertNonMigratedPluginsStatus(table, false);
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE);
}
@ -169,7 +173,7 @@ public class ConnectPluginPathTest { @@ -169,7 +173,7 @@ public class ConnectPluginPathTest {
TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED))
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertBadPackagingPluginsPresent(table);
assertBadPackagingPluginsStatus(table, false);
}
@ParameterizedTest
@ -187,11 +191,156 @@ public class ConnectPluginPathTest { @@ -187,11 +191,156 @@ public class ConnectPluginPathTest {
TestPlugins.TestPlugin.SERVICE_LOADER))
);
Map<String, List<String[]>> table = assertListSuccess(res);
assertNonMigratedPluginsPresent(table);
assertNonMigratedPluginsStatus(table, false);
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.SERVICE_LOADER);
}
@ParameterizedTest
@EnumSource
public void testSyncManifests(PluginLocationType type) {
PluginLocation locationA, locationB;
CommandResult res = runCommand(
"sync-manifests",
"--plugin-location",
locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION),
"--plugin-location",
locationB = setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER)
);
assertEquals(0, res.returnCode);
assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.reflective);
assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.serviceLoading);
Map<String, List<String[]>> table = assertListSuccess(runCommand(
"list",
"--plugin-location",
locationA,
"--plugin-location",
locationB
));
// Non-migrated plugins get new manifests
assertNonMigratedPluginsStatus(table, true);
assertBadPackagingPluginsStatus(table, true);
}
@ParameterizedTest
@EnumSource
public void testSyncManifestsDryRun(PluginLocationType type) {
PluginLocation locationA, locationB;
CommandResult res = runCommand(
"sync-manifests",
"--plugin-location",
locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION),
"--plugin-location",
locationB = setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER),
"--dry-run"
);
assertEquals(0, res.returnCode);
assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.reflective);
assertScanResult(false, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.serviceLoading);
Map<String, List<String[]>> table = assertListSuccess(runCommand(
"list",
"--plugin-location",
locationA,
"--plugin-location",
locationB
));
// Plugins are not migrated during a dry-run.
assertNonMigratedPluginsStatus(table, false);
assertBadPackagingPluginsStatus(table, false);
}
@ParameterizedTest
@EnumSource
public void testSyncManifestsDryRunReadOnlyLocation(PluginLocationType type) {
PluginLocation locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN);
assertTrue(locationA.path.toFile().setReadOnly());
CommandResult res = runCommand(
"sync-manifests",
"--plugin-location",
locationA,
"--dry-run"
);
assertEquals(2, res.returnCode);
}
@Test
public void testSyncManifestsDryRunReadOnlyMetaInf() {
PluginLocationType type = PluginLocationType.CLASS_HIERARCHY;
PluginLocation locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN);
String subPath = "META-INF";
assertTrue(locationA.path.resolve(subPath).toFile().setReadOnly());
CommandResult res = runCommand(
"sync-manifests",
"--plugin-location",
locationA,
"--dry-run"
);
assertEquals(2, res.returnCode);
}
@Test
public void testSyncManifestsDryRunReadOnlyServices() {
PluginLocationType type = PluginLocationType.CLASS_HIERARCHY;
PluginLocation locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN);
String subPath = "META-INF/services";
assertTrue(locationA.path.resolve(subPath).toFile().setReadOnly());
CommandResult res = runCommand(
"sync-manifests",
"--plugin-location",
locationA,
"--dry-run"
);
assertEquals(2, res.returnCode);
}
@Test
public void testSyncManifestsDryRunReadOnlyManifest() {
PluginLocationType type = PluginLocationType.CLASS_HIERARCHY;
PluginLocation locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN);
String subPath = "META-INF/services/" + PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY.superClass().getName();
assertTrue(locationA.path.resolve(subPath).toFile().setReadOnly());
CommandResult res = runCommand(
"sync-manifests",
"--plugin-location",
locationA,
"--dry-run"
);
assertEquals(2, res.returnCode);
}
@ParameterizedTest
@EnumSource
public void testSyncManifestsKeepNotFound(PluginLocationType type) {
PluginLocation locationA, locationB;
CommandResult res = runCommand(
"sync-manifests",
"--plugin-location",
locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION),
"--plugin-location",
locationB = setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER),
"--keep-not-found"
);
assertEquals(0, res.returnCode);
assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.reflective);
assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.serviceLoading);
assertScanResult(false, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION, res.reflective);
assertScanResult(false, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION, res.serviceLoading);
Map<String, List<String[]>> table = assertListSuccess(runCommand(
"list",
"--plugin-location",
locationA,
"--plugin-location",
locationB
));
// Non-migrated plugins get new manifests
assertNonMigratedPluginsStatus(table, true);
// Because --keep-not-found is specified, the bad packaging plugins keep their manifests
assertBadPackagingPluginsStatus(table, false);
}
private static Map<String, List<String[]>> assertListSuccess(CommandResult result) {
assertEquals(0, result.returnCode);
@ -204,24 +353,26 @@ public class ConnectPluginPathTest { @@ -204,24 +353,26 @@ public class ConnectPluginPathTest {
assertPluginMigrationStatus(table, true, true, plugins);
}
private static void assertNonMigratedPluginsPresent(Map<String, List<String[]>> table) {
assertPluginMigrationStatus(table, true, false,
private static void assertNonMigratedPluginsStatus(Map<String, List<String[]>> table, boolean migrated) {
// These plugins are missing manifests that get added during the migration
assertPluginMigrationStatus(table, true, migrated,
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
TestPlugins.TestPlugin.NON_MIGRATED_HEADER_CONVERTER,
TestPlugins.TestPlugin.NON_MIGRATED_PREDICATE,
TestPlugins.TestPlugin.NON_MIGRATED_SINK_CONNECTOR,
TestPlugins.TestPlugin.NON_MIGRATED_SOURCE_CONNECTOR,
TestPlugins.TestPlugin.NON_MIGRATED_TRANSFORMATION);
// This plugin is partially compatible
assertPluginMigrationStatus(table, true, null,
// This plugin is partially compatible, and becomes fully compatible during migration.
assertPluginMigrationStatus(table, true, migrated ? true : null,
TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN);
}
private static void assertBadPackagingPluginsPresent(Map<String, List<String[]>> table) {
private static void assertBadPackagingPluginsStatus(Map<String, List<String[]>> table, boolean migrated) {
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED,
TestPlugins.TestPlugin.BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR);
assertPluginMigrationStatus(table, false, true,
// These plugins have manifests that get removed during the migration
assertPluginMigrationStatus(table, false, !migrated,
TestPlugins.TestPlugin.BAD_PACKAGING_MISSING_SUPERCLASS,
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_CONNECTOR,
TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR,
@ -233,7 +384,6 @@ public class ConnectPluginPathTest { @@ -233,7 +384,6 @@ public class ConnectPluginPathTest {
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION);
}
private static void assertIsolatedPluginsInOutput(PluginScanResult reflectiveResult, Map<String, List<String[]>> table) {
reflectiveResult.forEach(pluginDesc -> {
if (pluginDesc.location().equals("classpath")) {
@ -264,19 +414,38 @@ public class ConnectPluginPathTest { @@ -264,19 +414,38 @@ public class ConnectPluginPathTest {
private static void assertPluginMigrationStatus(Map<String, List<String[]>> table, Boolean loadable, Boolean compatible, TestPlugins.TestPlugin... plugins) {
for (TestPlugins.TestPlugin plugin : plugins) {
assertTrue(table.containsKey(plugin.className()), "Plugin " + plugin.className() + " does not appear in list output");
for (String[] row : table.get(plugin.className())) {
log.info("row" + Arrays.toString(row));
if (loadable != null) {
assertEquals(loadable, Boolean.parseBoolean(row[LOADABLE_COL]), "Plugin loadable column for " + plugin.className() + " incorrect");
}
if (compatible != null) {
assertEquals(compatible, Boolean.parseBoolean(row[MANIFEST_COL]), "Plugin hasManifest column for " + plugin.className() + " incorrect");
if (loadable == null || loadable || compatible == null || compatible) {
assertTrue(table.containsKey(plugin.className()), "Plugin " + plugin.className() + " does not appear in list output");
for (String[] row : table.get(plugin.className())) {
log.info("row" + Arrays.toString(row));
if (loadable != null) {
assertEquals(loadable, Boolean.parseBoolean(row[LOADABLE_COL]), "Plugin loadable column for " + plugin.className() + " incorrect");
}
if (compatible != null) {
assertEquals(compatible, Boolean.parseBoolean(row[MANIFEST_COL]), "Plugin hasManifest column for " + plugin.className() + " incorrect");
}
}
} else {
// The plugins are not loadable or have manifests, so it should not be visible at all.
assertFalse(table.containsKey(plugin.className()), "Plugin " + plugin.className() + " should not appear in list output");
}
}
}
private static void assertScanResult(boolean expectToBeDiscovered, TestPlugins.TestPlugin plugin, PluginScanResult result) {
AtomicBoolean actuallyDiscovered = new AtomicBoolean();
result.forEach(pluginDesc -> {
if (pluginDesc.className().equals(plugin.className())) {
actuallyDiscovered.set(true);
}
});
if (expectToBeDiscovered && !actuallyDiscovered.get()) {
fail("Expected plugin " + plugin + " to be discoverable, but it was not.");
} else if (!expectToBeDiscovered && actuallyDiscovered.get()) {
fail("Expected plugin " + plugin + " to not be discoverable, but it was.");
}
}
private enum PluginLocationType {
CLASS_HIERARCHY,
SINGLE_JAR,
@ -326,12 +495,15 @@ public class ConnectPluginPathTest { @@ -326,12 +495,15 @@ public class ConnectPluginPathTest {
Path outputJar = path.resolveSibling(path.getFileName() + ".jar");
outputJar.getParent().toFile().mkdirs();
Files.copy(jarPath, outputJar, StandardCopyOption.REPLACE_EXISTING);
outputJar.toUri().toURL().openConnection().setDefaultUseCaches(false);
disableCaching(outputJar);
return new PluginLocation(outputJar);
}
case MULTI_JAR: {
Path outputJar = path.resolve(jarPath.getFileName());
outputJar.getParent().toFile().mkdirs();
Files.copy(jarPath, outputJar, StandardCopyOption.REPLACE_EXISTING);
disableCaching(outputJar);
return new PluginLocation(path);
}
default:
@ -342,6 +514,15 @@ public class ConnectPluginPathTest { @@ -342,6 +514,15 @@ public class ConnectPluginPathTest {
}
}
private static void disableCaching(Path path) throws IOException {
// This function is a workaround for a Java 8 caching bug. When Java 8 support is dropped it may be removed.
// This test runs the sync-manifests command, and _without stopping the jvm_ executes a list command.
// Under normal use, the sync-manifests command is followed immediately by a JVM shutdown, clearing caches.
// The Java 8 ServiceLoader does not disable the URLConnection caching, so doesn't read some previous writes.
// Java 9+ ServiceLoaders disable the URLConnection caching, so don't need this patch (it becomes a no-op)
path.toUri().toURL().openConnection().setDefaultUseCaches(false);
}
private static class PluginPathElement {
private final Path root;
private final List<PluginLocation> locations;

Loading…
Cancel
Save