diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index c02980210c2..b9d5470745a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -123,6 +123,10 @@ public class PluginUtils { + "|org\\.slf4j" + ")\\..*$"); + // If the base interface or class that will be used to identify Connect plugins resides within + // the same java package as the plugins that need to be loaded in isolation (and thus are + // added to the WHITELIST), then this base interface or class needs to be excluded in the + // regular expression pattern private static final Pattern WHITELIST = Pattern.compile("^org\\.apache\\.kafka\\.(?:connect\\.(?:" + "transforms\\.(?!Transformation$).*" + "|json\\..*" @@ -131,7 +135,7 @@ public class PluginUtils { + "|storage\\.StringConverter" + "|storage\\.SimpleHeaderConverter" + "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension" - + "|connector\\.policy\\..*" + + "|connector\\.policy\\.(?!ConnectorClientConfigOverridePolicy$).*" + ")" + "|common\\.config\\.provider\\.(?!ConfigProvider$).*" + ")$"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java index f9a2d8f4afa..bf441ff7f1b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -185,6 +185,25 @@ public class PluginUtilsTest { ); } + @Test + public void testConnectorClientConfigOverridePolicy() { + assertFalse(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy") + ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.connector.policy.AbstractConnectorClientConfigOverridePolicy") + ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy") + ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy") + ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy") + ); + } + @Test public void testEmptyPluginUrls() throws Exception { assertEquals(Collections.emptyList(), PluginUtils.pluginUrls(pluginPath));