Browse Source

KAFKA-3719; Allow underscores in hostname

Technically this does not strictly adhere to RFC-952 however it is valid for domain names, urls and uris so we should loosen the requirements a tad.

Author: Ryan Pridgeon <ryan.n.pridgeon@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1856 from rnpridgeon/KAFKA-3719
pull/1139/merge
Ryan Pridgeon 8 years ago committed by Ismael Juma
parent
commit
0de807357b
  1. 18
      clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
  2. 2
      clients/src/main/java/org/apache/kafka/common/utils/Utils.java
  3. 1
      clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
  4. 2
      core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
  5. 2
      core/src/main/scala/kafka/cluster/EndPoint.scala
  6. 11
      core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala

18
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java

@ -38,24 +38,26 @@ public class ClientUtils { @@ -38,24 +38,26 @@ public class ClientUtils {
public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
for (String url : urls) {
if (url != null && url.length() > 0) {
String host = getHost(url);
Integer port = getPort(url);
if (host == null || port == null)
throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
if (url != null && !url.isEmpty()) {
try {
String host = getHost(url);
Integer port = getPort(url);
if (host == null || port == null)
throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved()) {
log.warn("Removing server from " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " as DNS resolution failed: " + url);
log.warn("Removing server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
} else {
addresses.add(address);
}
} catch (NumberFormatException e) {
} catch (IllegalArgumentException e) {
throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
}
}
}
if (addresses.size() < 1)
if (addresses.isEmpty())
throw new ConfigException("No resolvable bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
return addresses;
}

2
clients/src/main/java/org/apache/kafka/common/utils/Utils.java

@ -51,7 +51,7 @@ public class Utils { @@ -51,7 +51,7 @@ public class Utils {
// This matches URIs of formats: host:port and protocol:\\host:port
// IPv6 is supported with [ip] pattern
private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%.:]*)\\]?:([0-9]+)");
private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)");
public static final String NL = System.getProperty("line.separator");

1
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

@ -38,6 +38,7 @@ public class UtilsTest { @@ -38,6 +38,7 @@ public class UtilsTest {
assertEquals("127.0.0.1", getHost("127.0.0.1:8000"));
assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080"));
assertEquals("MyDomain.com", getHost("PLAINTEXT://MyDomain.com:8080"));
assertEquals("My_Domain.com", getHost("PLAINTEXT://My_Domain.com:8080"));
assertEquals("::1", getHost("[::1]:1234"));
assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
assertEquals("2001:DB8:85A3:8D3:1319:8A2E:370:7348", getHost("PLAINTEXT://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678"));

2
core/src/main/scala/kafka/cluster/BrokerEndPoint.scala

@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Utils._ @@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Utils._
object BrokerEndPoint {
private val uriParseExp = """\[?([0-9a-zA-Z\-%.:]*)\]?:([0-9]+)""".r
private val uriParseExp = """\[?([0-9a-zA-Z\-%._:]*)\]?:([0-9]+)""".r
/**
* BrokerEndPoint URI is host:port or [ipv6_host]:port

2
core/src/main/scala/kafka/cluster/EndPoint.scala

@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils @@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils
object EndPoint {
private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%.:]*)\]?:(-?[0-9]+)""".r
private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%._:]*)\]?:(-?[0-9]+)""".r
def readFrom(buffer: ByteBuffer): EndPoint = {
val port = buffer.getInt()

11
core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala

@ -97,6 +97,11 @@ class BrokerEndPointTest extends Logging { @@ -97,6 +97,11 @@ class BrokerEndPointTest extends Logging {
var endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
assert(endpoint.host == "localhost")
assert(endpoint.port == 9092)
//KAFKA-3719
connectionString = "local_host:9092"
endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
assert(endpoint.host == "local_host")
assert(endpoint.port == 9092)
// also test for ipv6
connectionString = "[::1]:9092"
endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString)
@ -121,6 +126,12 @@ class BrokerEndPointTest extends Logging { @@ -121,6 +126,12 @@ class BrokerEndPointTest extends Logging {
assert(endpoint.host == "localhost")
assert(endpoint.port == 9092)
assert(endpoint.connectionString == "PLAINTEXT://localhost:9092")
// KAFKA-3719
connectionString = "PLAINTEXT://local_host:9092"
endpoint = EndPoint.createEndPoint(connectionString)
assert(endpoint.host == "local_host")
assert(endpoint.port == 9092)
assert(endpoint.connectionString == "PLAINTEXT://local_host:9092")
// also test for default bind
connectionString = "PLAINTEXT://:9092"
endpoint = EndPoint.createEndPoint(connectionString)

Loading…
Cancel
Save