Browse Source

KAFKA-4930: Enforce set of legal characters for connector names (KIP-212)

…to check for empty connector name and illegal characters in connector name. This also fixes  KAFKA-4938 by removing the check for slashes in connector name from ConnectorsResource.

Author: Ewen Cheslack-Postava <me@ewencp.org>
Author: Soenke Liebau <soenke.liebau@opencore.com>

Reviewers: Gwen Shapira <cshapi@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2755 from soenkeliebau/KAFKA-4930
pull/2755/merge
Soenke Liebau 7 years ago committed by Ewen Cheslack-Postava
parent
commit
530bc59de2
  1. 34
      clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
  2. 3
      clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
  3. 3
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
  4. 9
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
  5. 10
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
  6. 119
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java

34
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java

@ -982,6 +982,40 @@ public class ConfigDef { @@ -982,6 +982,40 @@ public class ConfigDef {
}
}
public static class NonEmptyStringWithoutControlChars implements Validator {
public static NonEmptyStringWithoutControlChars nonEmptyStringWithoutControlChars() {
return new NonEmptyStringWithoutControlChars();
}
@Override
public void ensureValid(String name, Object value) {
String s = (String) value;
if (s == null) {
// This can happen during creation of the config object due to no default value being defined for the
// name configuration - a missing name parameter is caught when checking for mandatory parameters,
// thus we can ok a null value here
return;
} else if (s.isEmpty()) {
throw new ConfigException(name, value, "String may not be empty");
}
// Check name string for illegal characters
ArrayList<Integer> foundIllegalCharacters = new ArrayList<>();
for (int i = 0; i < s.length(); i++) {
if (Character.isISOControl(s.codePointAt(i))) {
foundIllegalCharacters.add(s.codePointAt(i));
}
}
if (!foundIllegalCharacters.isEmpty()) {
throw new ConfigException(name, value, "String may not contain control sequences but had the following ASCII chars: " + Utils.join(foundIllegalCharacters, ", "));
}
}
}
public static class ConfigKey {
public final String name;
public final Type type;

3
clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java

@ -160,6 +160,9 @@ public class ConfigDefTest { @@ -160,6 +160,9 @@ public class ConfigDefTest {
testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[] {null});
testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"});
testValidators(Type.STRING, new ConfigDef.NonEmptyStringWithoutControlChars(), "defaultname",
new Object[]{"test", "name", "test/test", "test\u1234", "\u1324name\\", "/+%>&):??<&()?-", "+1", "\uD83D\uDE01", "\uF3B1", " test \n\r", "\n hello \t"},
new Object[]{"nontrailing\nnotallowed", "as\u0001cii control char", "tes\rt", "test\btest", "1\t2", ""});
}
@Test

3
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java

@ -37,6 +37,7 @@ import java.util.List; @@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
/**
* <p>
@ -96,7 +97,7 @@ public class ConnectorConfig extends AbstractConfig { @@ -96,7 +97,7 @@ public class ConnectorConfig extends AbstractConfig {
public static ConfigDef configDef() {
return new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)

9
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java

@ -90,10 +90,11 @@ public class ConnectorsResource { @@ -90,10 +90,11 @@ public class ConnectorsResource {
@Path("/")
public Response createConnector(final @QueryParam("forward") Boolean forward,
final CreateConnectorRequest createRequest) throws Throwable {
String name = createRequest.name();
if (name.contains("/")) {
throw new BadRequestException("connector name should not contain '/'");
}
// Trim leading and trailing whitespaces from the connector name, replace null with empty string
// if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
// allows null values)
String name = createRequest.name() == null ? "" : createRequest.name().trim();
Map<String, String> configs = createRequest.config();
checkAndPutConnectorConfigName(name, configs);

10
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java

@ -90,6 +90,14 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @@ -90,6 +90,14 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
new ConnectorConfig(MOCK_PLUGINS, props);
}
@Test(expected = ConfigException.class)
public void emptyConnectorName() {
Map<String, String> props = new HashMap<>();
props.put("name", "");
props.put("connector.class", TestConnector.class.getName());
new ConnectorConfig(MOCK_PLUGINS, props);
}
@Test(expected = ConfigException.class)
public void wrongTransformationType() {
Map<String, String> props = new HashMap<>();
@ -168,5 +176,5 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @@ -168,5 +176,5 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
assertEquals(42, ((SimpleTransformation) transformations.get(0)).magicNumber);
assertEquals(84, ((SimpleTransformation) transformations.get(1)).magicNumber);
}
}

119
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java

@ -68,8 +68,11 @@ public class ConnectorsResourceTest { @@ -68,8 +68,11 @@ public class ConnectorsResourceTest {
// URL construction properly, avoiding //, which will mess up routing in the REST server
private static final String LEADER_URL = "http://leader:8083/";
private static final String CONNECTOR_NAME = "test";
private static final String CONNECTOR_NAME_SPECIAL_CHARS = "t?a=b&c=d\rx=1.1\n>< \t`'\" x%y+z!#$&'()*+,:;=?@[]";
private static final String CONNECTOR_NAME_SPECIAL_CHARS = "ta/b&c=d//\\rx=1þ.1>< `'\" x%y+z!ሴ#$&'(æ)*+,:;=?ñ@[]ÿ";
private static final String CONNECTOR_NAME_CONTROL_SEQUENCES1 = "ta/b&c=drx=1\n.1>< `'\" x%y+z!#$&'()*+,:;=?@[]";
private static final String CONNECTOR2_NAME = "test2";
private static final String CONNECTOR_NAME_ALL_WHITESPACES = " \t\n \b";
private static final String CONNECTOR_NAME_PADDING_WHITESPACES = " " + CONNECTOR_NAME + " \n ";
private static final Boolean FORWARD = true;
private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS = new HashMap<>();
static {
@ -82,6 +85,24 @@ public class ConnectorsResourceTest { @@ -82,6 +85,24 @@ public class ConnectorsResourceTest {
CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
CONNECTOR_CONFIG.put("sample_config", "test_config");
}
private static final Map<String, String> CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>();
static {
CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name", CONNECTOR_NAME_CONTROL_SEQUENCES1);
CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("sample_config", "test_config");
}
private static final Map<String, String> CONNECTOR_CONFIG_WITHOUT_NAME = new HashMap<>();
static {
CONNECTOR_CONFIG_WITHOUT_NAME.put("sample_config", "test_config");
}
private static final Map<String, String> CONNECTOR_CONFIG_WITH_EMPTY_NAME = new HashMap<>();
static {
CONNECTOR_CONFIG_WITH_EMPTY_NAME.put(ConnectorConfig.NAME_CONFIG, "");
CONNECTOR_CONFIG_WITH_EMPTY_NAME.put("sample_config", "test_config");
}
private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES = Arrays.asList(
new ConnectorTaskId(CONNECTOR_NAME, 0),
new ConnectorTaskId(CONNECTOR_NAME, 1)
@ -108,6 +129,12 @@ public class ConnectorsResourceTest { @@ -108,6 +129,12 @@ public class ConnectorsResourceTest {
connectorsResource = new ConnectorsResource(herder, null);
}
private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) {
Map<String, String> result = new HashMap<>();
result.putAll(mapToClone);
return result;
}
@Test
public void testListConnectors() throws Throwable {
final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
@ -206,20 +233,59 @@ public class ConnectorsResourceTest { @@ -206,20 +233,59 @@ public class ConnectorsResourceTest {
PowerMock.verifyAll();
}
@Test(expected = BadRequestException.class)
public void testCreateConnectorWithASlashInItsName() throws Throwable {
String badConnectorName = CONNECTOR_NAME + "/" + "test";
@Test
public void testCreateConnectorNameTrimWhitespaces() throws Throwable {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG);
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, bodyIn);
CreateConnectorRequest body = new CreateConnectorRequest(badConnectorName, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, badConnectorName));
PowerMock.verifyAll();
}
@Test
public void testCreateConnectorNameAllWhitespaces() throws Throwable {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME);
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, body);
connectorsResource.createConnector(FORWARD, bodyIn);
PowerMock.verifyAll();
}
@Test
public void testCreateConnectorNoName() throws Throwable {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, inputConfig);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME);
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
PowerMock.replayAll();
connectorsResource.createConnector(FORWARD, bodyIn);
PowerMock.verifyAll();
}
@ -342,6 +408,24 @@ public class ConnectorsResourceTest { @@ -342,6 +408,24 @@ public class ConnectorsResourceTest {
PowerMock.verifyAll();
}
@Test
public void testCreateConnectorWithControlSequenceInName() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME_CONTROL_SEQUENCES1));
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
PowerMock.replayAll();
String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
PowerMock.verifyAll();
}
@Test
public void testPutConnectorConfigWithSpecialCharsInName() throws Throwable {
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
@ -359,6 +443,23 @@ public class ConnectorsResourceTest { @@ -359,6 +443,23 @@ public class ConnectorsResourceTest {
PowerMock.verifyAll();
}
@Test
public void testPutConnectorConfigWithControlSequenceInName() throws Throwable {
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), EasyMock.eq(CONNECTOR_CONFIG_CONTROL_SEQUENCES), EasyMock.eq(true), EasyMock.capture(cb));
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG_CONTROL_SEQUENCES, CONNECTOR_TASK_NAMES,
ConnectorType.SINK)));
PowerMock.replayAll();
String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1, FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
PowerMock.verifyAll();
}
@Test(expected = BadRequestException.class)
public void testPutConnectorConfigNameMismatch() throws Throwable {
Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);

Loading…
Cancel
Save