Browse Source

KAFKA-12482 Remove deprecated rest.host.name and rest.port configs (#10841)

Remove the `rest.host.name` and `rest.port` Connect worker configs that were deprecated in KIP-208 and AK 1.1.

Author: Kalpesh Patel <kalpeshpatel.india@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, wenbingshen <oliver.shen999@gmail.com>
pull/10190/head
kpatelatwork 3 years ago committed by GitHub
parent
commit
5652ef1af0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      config/connect-distributed.properties
  2. 65
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
  3. 25
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
  4. 48
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
  5. 31
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
  6. 8
      connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
  7. 4
      docs/upgrade.html

9
config/connect-distributed.properties

@ -69,18 +69,11 @@ offset.flush.interval.ms=10000 @@ -69,18 +69,11 @@ offset.flush.interval.ms=10000
# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# After this parameter is set, rest.host.name/port will not take effect.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
# DEPRECATED As of 1.1.0: only used when listeners is not set. Use listeners instead.
#rest.host.name=
#rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" or the rest.host.name/port if configured.
# If not set, it uses the value for "listeners" if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=

65
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java

@ -146,31 +146,13 @@ public class WorkerConfig extends AbstractConfig { @@ -146,31 +146,13 @@ public class WorkerConfig extends AbstractConfig {
+ "data to be committed in a future attempt.";
public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
/**
* @deprecated As of 1.1.0. Only used when listeners is not set. Use listeners instead.
*/
@Deprecated
public static final String REST_HOST_NAME_CONFIG = "rest.host.name";
private static final String REST_HOST_NAME_DOC
= "Hostname for the REST API. If this is set, it will only bind to this interface.\n" +
"Deprecated, only used when listeners is not set. Use listeners instead.";
/**
* @deprecated As of 1.1.0. Only used when listeners is not set. Use listeners instead.
*/
@Deprecated
public static final String REST_PORT_CONFIG = "rest.port";
private static final String REST_PORT_DOC
= "Port for the REST API to listen on.\n" +
"Deprecated, only used when listeners is not set. Use listeners instead.";
public static final int REST_PORT_DEFAULT = 8083;
public static final String LISTENERS_CONFIG = "listeners";
private static final String LISTENERS_DOC
= "List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.\n" +
" Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
" Leave hostname empty to bind to default interface.\n" +
" Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084";
static final List<String> LISTENERS_DEFAULT = Collections.singletonList("http://:8083");
public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name";
private static final String REST_ADVERTISED_HOST_NAME_DOC
@ -203,7 +185,6 @@ public class WorkerConfig extends AbstractConfig { @@ -203,7 +185,6 @@ public class WorkerConfig extends AbstractConfig {
" The supported protocols are HTTP and HTTPS." +
" An empty or blank string will disable this feature." +
" The default behavior is to use the regular listener (specified by the 'listeners' property).";
protected static final List<String> ADMIN_LISTENERS_DEFAULT = null;
public static final String ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX = "admin.listeners.https.";
public static final String PLUGIN_PATH_CONFIG = "plugin.path";
@ -305,9 +286,7 @@ public class WorkerConfig extends AbstractConfig { @@ -305,9 +286,7 @@ public class WorkerConfig extends AbstractConfig {
Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
.define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC)
.define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
.define(LISTENERS_CONFIG, Type.LIST, null, Importance.LOW, LISTENERS_DOC)
.define(LISTENERS_CONFIG, Type.LIST, LISTENERS_DEFAULT, new ListenersValidator(), Importance.LOW, LISTENERS_DOC)
.define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
.define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
.define(REST_ADVERTISED_LISTENER_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_LISTENER_DOC)
@ -499,6 +478,34 @@ public class WorkerConfig extends AbstractConfig { @@ -499,6 +478,34 @@ public class WorkerConfig extends AbstractConfig {
}
}
private static class ListenersValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
if (!(value instanceof List)) {
throw new ConfigException("Invalid value type for listeners (expected list of URLs , ex: http://localhost:8080,https://localhost:8443).");
}
List<?> items = (List<?>) value;
if (items.isEmpty()) {
throw new ConfigException("Invalid value for listeners, at least one URL is expected, ex: http://localhost:8080,https://localhost:8443.");
}
for (Object item : items) {
if (!(item instanceof String)) {
throw new ConfigException("Invalid type for listeners (expected String).");
}
if (Utils.isBlank((String) item)) {
throw new ConfigException("Empty URL found when parsing listeners list.");
}
}
}
@Override
public String toString() {
return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
}
}
private static class AdminListenersValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
@ -507,27 +514,27 @@ public class WorkerConfig extends AbstractConfig { @@ -507,27 +514,27 @@ public class WorkerConfig extends AbstractConfig {
}
if (!(value instanceof List)) {
throw new ConfigException("Invalid value type (list expected).");
throw new ConfigException("Invalid value type for admin.listeners (expected list).");
}
List items = (List) value;
List<?> items = (List<?>) value;
if (items.isEmpty()) {
return;
}
for (Object item: items) {
for (Object item : items) {
if (!(item instanceof String)) {
throw new ConfigException("Invalid type for admin listener (expected String).");
throw new ConfigException("Invalid type for admin.listeners (expected String).");
}
if (Utils.isBlank((String) item)) {
throw new ConfigException("Empty listener found when parsing list.");
throw new ConfigException("Empty URL found when parsing admin.listeners list.");
}
}
}
@Override
public String toString() {
return "List of comma-separated URIs, ex: http://localhost:8080,https://localhost:8443.";
return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
}
}

25
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java

@ -96,7 +96,7 @@ public class RestServer { @@ -96,7 +96,7 @@ public class RestServer {
public RestServer(WorkerConfig config) {
this.config = config;
List<String> listeners = parseListeners();
List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
jettyServer = new Server();
@ -105,21 +105,6 @@ public class RestServer { @@ -105,21 +105,6 @@ public class RestServer {
createConnectors(listeners, adminListeners);
}
@SuppressWarnings("deprecation")
List<String> parseListeners() {
List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
if (listeners == null || listeners.size() == 0) {
String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);
if (hostname == null)
hostname = "";
listeners = Collections.singletonList(String.format("%s://%s:%d", PROTOCOL_HTTP, hostname, config.getInt(WorkerConfig.REST_PORT_CONFIG)));
}
return listeners;
}
/**
* Adds Jetty connector for each configured listener
*/
@ -127,11 +112,9 @@ public class RestServer { @@ -127,11 +112,9 @@ public class RestServer {
List<Connector> connectors = new ArrayList<>();
for (String listener : listeners) {
if (!listener.isEmpty()) {
Connector connector = createConnector(listener);
connectors.add(connector);
log.info("Added connector for {}", listener);
}
Connector connector = createConnector(listener);
connectors.add(connector);
log.info("Added connector for {}", listener);
}
jettyServer.setConnectors(connectors.toArray(new Connector[0]));

48
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java

@ -25,6 +25,7 @@ import java.util.HashMap; @@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.List;
import static org.apache.kafka.connect.runtime.WorkerConfig.LISTENERS_DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -56,6 +57,47 @@ public class WorkerConfigTest { @@ -56,6 +57,47 @@ public class WorkerConfigTest {
"set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate "
);
@Test
public void testListenersConfigAllowedValues() {
Map<String, String> props = baseProps();
// no value set for "listeners"
WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG));
props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999");
config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
assertEquals(Arrays.asList("http://a.b:9999"), config.getList(WorkerConfig.LISTENERS_CONFIG));
props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.getList(WorkerConfig.LISTENERS_CONFIG));
new WorkerConfig(WorkerConfig.baseConfigDef(), props);
}
@Test
public void testListenersConfigNotAllowedValues() {
Map<String, String> props = baseProps();
assertEquals(LISTENERS_DEFAULT, new WorkerConfig(WorkerConfig.baseConfigDef(), props).getList(WorkerConfig.LISTENERS_CONFIG));
props.put(WorkerConfig.LISTENERS_CONFIG, "");
ConfigException ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
assertTrue(ce.getMessage().contains(" listeners"));
props.put(WorkerConfig.LISTENERS_CONFIG, ",,,");
ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
assertTrue(ce.getMessage().contains(" listeners"));
props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999,");
ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
assertTrue(ce.getMessage().contains(" listeners"));
props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
assertTrue(ce.getMessage().contains(" listeners"));
}
@Test
public void testAdminListenersConfigAllowedValues() {
Map<String, String> props = baseProps();
@ -70,7 +112,7 @@ public class WorkerConfigTest { @@ -70,7 +112,7 @@ public class WorkerConfigTest {
props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
assertEquals(config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG), Arrays.asList("http://a.b:9999", "https://a.b:7812"));
assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG));
new WorkerConfig(WorkerConfig.baseConfigDef(), props);
}
@ -78,8 +120,10 @@ public class WorkerConfigTest { @@ -78,8 +120,10 @@ public class WorkerConfigTest {
@Test
public void testAdminListenersNotAllowingEmptyStrings() {
Map<String, String> props = baseProps();
props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999,");
assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
ConfigException ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
assertTrue(ce.getMessage().contains(" admin.listeners"));
}
@Test

31
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java

@ -107,28 +107,6 @@ public class RestServerTest { @@ -107,28 +107,6 @@ public class RestServerTest {
checkCORSRequest("", "http://bar.com", null, null);
}
@SuppressWarnings("deprecation")
@Test
public void testParseListeners() {
// Use listeners field
Map<String, String> configMap = new HashMap<>(baseWorkerProps());
configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
DistributedConfig config = new DistributedConfig(configMap);
server = new RestServer(config);
Assert.assertArrayEquals(new String[] {"http://localhost:8080", "https://localhost:8443"}, server.parseListeners().toArray());
// Build listener from hostname and port
configMap = new HashMap<>(baseWorkerProps());
configMap.remove(WorkerConfig.LISTENERS_CONFIG);
configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname");
configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080");
config = new DistributedConfig(configMap);
server = new RestServer(config);
Assert.assertArrayEquals(new String[] {"http://my-hostname:8080"}, server.parseListeners().toArray());
}
@SuppressWarnings("deprecation")
@Test
public void testAdvertisedUri() {
// Advertised URI from listeners without protocol
@ -167,15 +145,6 @@ public class RestServerTest { @@ -167,15 +145,6 @@ public class RestServerTest {
server = new RestServer(config);
Assert.assertEquals("http://somehost:10000/", server.advertisedUrl().toString());
// listener from hostname and port
configMap = new HashMap<>(baseWorkerProps());
configMap.remove(WorkerConfig.LISTENERS_CONFIG);
configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname");
configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080");
config = new DistributedConfig(configMap);
server = new RestServer(config);
Assert.assertEquals("http://my-hostname:8080/", server.advertisedUrl().toString());
// correct listener is chosen when https listener is configured before http listener and advertised listener is http
configMap = new HashMap<>(baseWorkerProps());
configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761");

8
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java

@ -53,8 +53,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; @@ -53,8 +53,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.REST_HOST_NAME_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.REST_PORT_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.LISTENERS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG;
@ -237,13 +236,12 @@ public class EmbeddedConnectCluster { @@ -237,13 +236,12 @@ public class EmbeddedConnectCluster {
return workers().stream().allMatch(WorkerHandle::isRunning);
}
@SuppressWarnings("deprecation")
public void startConnect() {
log.info("Starting Connect cluster '{}' with {} workers", connectClusterName, numInitialWorkers);
workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers());
workerProps.put(REST_HOST_NAME_CONFIG, REST_HOST_NAME);
workerProps.put(REST_PORT_CONFIG, "0"); // use a random available port
// use a random available port
workerProps.put(LISTENERS_CONFIG, "HTTP://" + REST_HOST_NAME + ":0");
String internalTopicsReplFactor = String.valueOf(numBrokers);
putIfAbsent(workerProps, GROUP_ID_CONFIG, "connect-integration-test-" + connectClusterName);

4
docs/upgrade.html

@ -31,7 +31,7 @@ @@ -31,7 +31,7 @@
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
<li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
<li>A number of deprecated classes, methods and tools have been removed from the <code>clients</code>, <code>core</code> and <code>tools</code> modules:</li>
<li>A number of deprecated classes, methods and tools have been removed from the <code>clients</code>, <code>connect</code>, <code>core</code> and <code>tools</code> modules:</li>
<ul>
<li>The Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
and <code>AclAuthorizer</code> instead.</li>
@ -73,6 +73,8 @@ @@ -73,6 +73,8 @@
Dynamic quota defaults must be used instead.</li>
<li>The <code>port</code> and <code>host.name</code> configurations were removed. Please use <code>listeners</code> instead.</li>
<li>The <code>advertised.port</code> and <code>advertised.host.name</code> configurations were removed. Please use <code>advertised.listeners</code> instead.</li>
<li>The deprecated worker configurations <code>rest.host.name</code> and <code>rest.port</code> were removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12482">KAFKA-12482</a>) from the Kafka Connect worker configuration.
Please use <code>listeners</code> instead.</li>
</ul>
<li> The <code>Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)</code> method has been deprecated. Please use
<code>Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)</code> instead, where the <code>ConsumerGroupMetadata</code>

Loading…
Cancel
Save