From 530bc59de2732ba71e82c0110c59e9f6162531c6 Mon Sep 17 00:00:00 2001 From: Soenke Liebau Date: Wed, 31 Jan 2018 08:49:23 -0800 Subject: [PATCH] KAFKA-4930: Enforce set of legal characters for connector names (KIP-212) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …to check for empty connector name and illegal characters in connector name. This also fixes KAFKA-4938 by removing the check for slashes in connector name from ConnectorsResource. Author: Ewen Cheslack-Postava Author: Soenke Liebau Reviewers: Gwen Shapira , Viktor Somogyi , Randall Hauch , Ewen Cheslack-Postava Closes #2755 from soenkeliebau/KAFKA-4930 --- .../apache/kafka/common/config/ConfigDef.java | 34 +++++ .../kafka/common/config/ConfigDefTest.java | 3 + .../connect/runtime/ConnectorConfig.java | 3 +- .../rest/resources/ConnectorsResource.java | 9 +- .../connect/runtime/ConnectorConfigTest.java | 10 +- .../resources/ConnectorsResourceTest.java | 119 ++++++++++++++++-- 6 files changed, 163 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 30802984cac..bb199dde202 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -982,6 +982,40 @@ public class ConfigDef { } } + public static class NonEmptyStringWithoutControlChars implements Validator { + + public static NonEmptyStringWithoutControlChars nonEmptyStringWithoutControlChars() { + return new NonEmptyStringWithoutControlChars(); + } + + @Override + public void ensureValid(String name, Object value) { + String s = (String) value; + + if (s == null) { + // This can happen during creation of the config object due to no default value being defined for the + // name configuration - a missing name parameter is caught when checking for mandatory parameters, + // thus we can ok a null value here + return; + } else if (s.isEmpty()) { + throw new ConfigException(name, value, "String may not be empty"); + } + + // Check name string for illegal characters + ArrayList foundIllegalCharacters = new ArrayList<>(); + + for (int i = 0; i < s.length(); i++) { + if (Character.isISOControl(s.codePointAt(i))) { + foundIllegalCharacters.add(s.codePointAt(i)); + } + } + + if (!foundIllegalCharacters.isEmpty()) { + throw new ConfigException(name, value, "String may not contain control sequences but had the following ASCII chars: " + Utils.join(foundIllegalCharacters, ", ")); + } + } + } + public static class ConfigKey { public final String name; public final Type type; diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 602147b3114..339c51aa4b1 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -160,6 +160,9 @@ public class ConfigDefTest { testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"}); testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[] {null}); testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"}); + testValidators(Type.STRING, new ConfigDef.NonEmptyStringWithoutControlChars(), "defaultname", + new Object[]{"test", "name", "test/test", "test\u1234", "\u1324name\\", "/+%>&):??<&()?-", "+1", "\uD83D\uDE01", "\uF3B1", " test \n\r", "\n hello \t"}, + new Object[]{"nontrailing\nnotallowed", "as\u0001cii control char", "tes\rt", "test\btest", "1\t2", ""}); } @Test diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index e63d1002a92..aad12c3a4a5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; /** *

@@ -96,7 +97,7 @@ public class ConnectorConfig extends AbstractConfig { public static ConfigDef configDef() { return new ConfigDef() - .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY) + .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY) .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY) .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY) .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 7a0116897ec..e9661046d31 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -90,10 +90,11 @@ public class ConnectorsResource { @Path("/") public Response createConnector(final @QueryParam("forward") Boolean forward, final CreateConnectorRequest createRequest) throws Throwable { - String name = createRequest.name(); - if (name.contains("/")) { - throw new BadRequestException("connector name should not contain '/'"); - } + // Trim leading and trailing whitespaces from the connector name, replace null with empty string + // if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars + // allows null values) + String name = createRequest.name() == null ? "" : createRequest.name().trim(); + Map configs = createRequest.config(); checkAndPutConnectorConfigName(name, configs); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java index f8c4fd690dc..fe1bf264654 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java @@ -90,6 +90,14 @@ public class ConnectorConfigTest> { new ConnectorConfig(MOCK_PLUGINS, props); } + @Test(expected = ConfigException.class) + public void emptyConnectorName() { + Map props = new HashMap<>(); + props.put("name", ""); + props.put("connector.class", TestConnector.class.getName()); + new ConnectorConfig(MOCK_PLUGINS, props); + } + @Test(expected = ConfigException.class) public void wrongTransformationType() { Map props = new HashMap<>(); @@ -168,5 +176,5 @@ public class ConnectorConfigTest> { assertEquals(42, ((SimpleTransformation) transformations.get(0)).magicNumber); assertEquals(84, ((SimpleTransformation) transformations.get(1)).magicNumber); } - + } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 6e173497c46..cc4608064ed 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -68,8 +68,11 @@ public class ConnectorsResourceTest { // URL construction properly, avoiding //, which will mess up routing in the REST server private static final String LEADER_URL = "http://leader:8083/"; private static final String CONNECTOR_NAME = "test"; - private static final String CONNECTOR_NAME_SPECIAL_CHARS = "t?a=b&c=d\rx=1.1\n>< \t`'\" x%y+z!#$&'()*+,:;=?@[]"; + private static final String CONNECTOR_NAME_SPECIAL_CHARS = "ta/b&c=d//\\rx=1þ.1>< `'\" x%y+z!ሴ#$&'(æ)*+,:;=?ñ@[]ÿ"; + private static final String CONNECTOR_NAME_CONTROL_SEQUENCES1 = "ta/b&c=drx=1\n.1>< `'\" x%y+z!#$&'()*+,:;=?@[]"; private static final String CONNECTOR2_NAME = "test2"; + private static final String CONNECTOR_NAME_ALL_WHITESPACES = " \t\n \b"; + private static final String CONNECTOR_NAME_PADDING_WHITESPACES = " " + CONNECTOR_NAME + " \n "; private static final Boolean FORWARD = true; private static final Map CONNECTOR_CONFIG_SPECIAL_CHARS = new HashMap<>(); static { @@ -82,6 +85,24 @@ public class ConnectorsResourceTest { CONNECTOR_CONFIG.put("name", CONNECTOR_NAME); CONNECTOR_CONFIG.put("sample_config", "test_config"); } + + private static final Map CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>(); + static { + CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name", CONNECTOR_NAME_CONTROL_SEQUENCES1); + CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("sample_config", "test_config"); + } + + private static final Map CONNECTOR_CONFIG_WITHOUT_NAME = new HashMap<>(); + static { + CONNECTOR_CONFIG_WITHOUT_NAME.put("sample_config", "test_config"); + } + + private static final Map CONNECTOR_CONFIG_WITH_EMPTY_NAME = new HashMap<>(); + + static { + CONNECTOR_CONFIG_WITH_EMPTY_NAME.put(ConnectorConfig.NAME_CONFIG, ""); + CONNECTOR_CONFIG_WITH_EMPTY_NAME.put("sample_config", "test_config"); + } private static final List CONNECTOR_TASK_NAMES = Arrays.asList( new ConnectorTaskId(CONNECTOR_NAME, 0), new ConnectorTaskId(CONNECTOR_NAME, 1) @@ -108,6 +129,12 @@ public class ConnectorsResourceTest { connectorsResource = new ConnectorsResource(herder, null); } + private static final Map getConnectorConfig(Map mapToClone) { + Map result = new HashMap<>(); + result.putAll(mapToClone); + return result; + } + @Test public void testListConnectors() throws Throwable { final Capture>> cb = Capture.newInstance(); @@ -206,20 +233,59 @@ public class ConnectorsResourceTest { PowerMock.verifyAll(); } - @Test(expected = BadRequestException.class) - public void testCreateConnectorWithASlashInItsName() throws Throwable { - String badConnectorName = CONNECTOR_NAME + "/" + "test"; + @Test + public void testCreateConnectorNameTrimWhitespaces() throws Throwable { + // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this + // will affect later tests + Map inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME); + final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig); + final CreateConnectorRequest bodyOut = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG); + + final Capture>> cb = Capture.newInstance(); + herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb)); + expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); + + PowerMock.replayAll(); + + connectorsResource.createConnector(FORWARD, bodyIn); - CreateConnectorRequest body = new CreateConnectorRequest(badConnectorName, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, badConnectorName)); + PowerMock.verifyAll(); + } + + @Test + public void testCreateConnectorNameAllWhitespaces() throws Throwable { + // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this + // will affect later tests + Map inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME); + final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig); + final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME); final Capture>> cb = Capture.newInstance(); - herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); - expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, - ConnectorType.SOURCE))); + herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb)); + expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); PowerMock.replayAll(); - connectorsResource.createConnector(FORWARD, body); + connectorsResource.createConnector(FORWARD, bodyIn); + + PowerMock.verifyAll(); + } + + @Test + public void testCreateConnectorNoName() throws Throwable { + // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this + // will affect later tests + Map inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME); + final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, inputConfig); + final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME); + + final Capture>> cb = Capture.newInstance(); + herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb)); + expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); + + PowerMock.replayAll(); + + connectorsResource.createConnector(FORWARD, bodyIn); PowerMock.verifyAll(); } @@ -342,6 +408,24 @@ public class ConnectorsResourceTest { PowerMock.verifyAll(); } + @Test + public void testCreateConnectorWithControlSequenceInName() throws Throwable { + CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME_CONTROL_SEQUENCES1)); + + final Capture>> cb = Capture.newInstance(); + herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); + expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG, + CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))); + + PowerMock.replayAll(); + + String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString(); + String decoded = new URI(rspLocation).getPath(); + Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded); + + PowerMock.verifyAll(); + } + @Test public void testPutConnectorConfigWithSpecialCharsInName() throws Throwable { final Capture>> cb = Capture.newInstance(); @@ -359,6 +443,23 @@ public class ConnectorsResourceTest { PowerMock.verifyAll(); } + @Test + public void testPutConnectorConfigWithControlSequenceInName() throws Throwable { + final Capture>> cb = Capture.newInstance(); + + herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), EasyMock.eq(CONNECTOR_CONFIG_CONTROL_SEQUENCES), EasyMock.eq(true), EasyMock.capture(cb)); + expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG_CONTROL_SEQUENCES, CONNECTOR_TASK_NAMES, + ConnectorType.SINK))); + + PowerMock.replayAll(); + + String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1, FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString(); + String decoded = new URI(rspLocation).getPath(); + Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded); + + PowerMock.verifyAll(); + } + @Test(expected = BadRequestException.class) public void testPutConnectorConfigNameMismatch() throws Throwable { Map connConfig = new HashMap<>(CONNECTOR_CONFIG);