diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 0c12ea13450..2672b701aaf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -38,24 +38,26 @@ public class ClientUtils { public static List parseAndValidateAddresses(List urls) { List addresses = new ArrayList(); for (String url : urls) { - if (url != null && url.length() > 0) { - String host = getHost(url); - Integer port = getPort(url); - if (host == null || port == null) - throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + if (url != null && !url.isEmpty()) { try { + String host = getHost(url); + Integer port = getPort(url); + if (host == null || port == null) + throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + InetSocketAddress address = new InetSocketAddress(host, port); + if (address.isUnresolved()) { - log.warn("Removing server from " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + " as DNS resolution failed: " + url); + log.warn("Removing server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); } else { addresses.add(address); } - } catch (NumberFormatException e) { + } catch (IllegalArgumentException e) { throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); } } } - if (addresses.size() < 1) + if (addresses.isEmpty()) throw new ConfigException("No resolvable bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); return addresses; } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 8d7014a705a..1bbfea93f10 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -51,7 +51,7 @@ public class Utils { // This matches URIs of formats: host:port and protocol:\\host:port // IPv6 is supported with [ip] pattern - private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%.:]*)\\]?:([0-9]+)"); + private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)"); public static final String NL = System.getProperty("line.separator"); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 1af7e43f8e9..46400b4a634 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -38,6 +38,7 @@ public class UtilsTest { assertEquals("127.0.0.1", getHost("127.0.0.1:8000")); assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080")); assertEquals("MyDomain.com", getHost("PLAINTEXT://MyDomain.com:8080")); + assertEquals("My_Domain.com", getHost("PLAINTEXT://My_Domain.com:8080")); assertEquals("::1", getHost("[::1]:1234")); assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); assertEquals("2001:DB8:85A3:8D3:1319:8A2E:370:7348", getHost("PLAINTEXT://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678")); diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala index 99cf66688b0..91823f086c4 100644 --- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala +++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.Utils._ object BrokerEndPoint { - private val uriParseExp = """\[?([0-9a-zA-Z\-%.:]*)\]?:([0-9]+)""".r + private val uriParseExp = """\[?([0-9a-zA-Z\-%._:]*)\]?:([0-9]+)""".r /** * BrokerEndPoint URI is host:port or [ipv6_host]:port diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala index eb45aae54c5..720d8196266 100644 --- a/core/src/main/scala/kafka/cluster/EndPoint.scala +++ b/core/src/main/scala/kafka/cluster/EndPoint.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils object EndPoint { - private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%.:]*)\]?:(-?[0-9]+)""".r + private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%._:]*)\]?:(-?[0-9]+)""".r def readFrom(buffer: ByteBuffer): EndPoint = { val port = buffer.getInt() diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala index cec8fec1f1f..5554b39eac5 100644 --- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala @@ -97,6 +97,11 @@ class BrokerEndPointTest extends Logging { var endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString) assert(endpoint.host == "localhost") assert(endpoint.port == 9092) + //KAFKA-3719 + connectionString = "local_host:9092" + endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString) + assert(endpoint.host == "local_host") + assert(endpoint.port == 9092) // also test for ipv6 connectionString = "[::1]:9092" endpoint = BrokerEndPoint.createBrokerEndPoint(1, connectionString) @@ -121,6 +126,12 @@ class BrokerEndPointTest extends Logging { assert(endpoint.host == "localhost") assert(endpoint.port == 9092) assert(endpoint.connectionString == "PLAINTEXT://localhost:9092") + // KAFKA-3719 + connectionString = "PLAINTEXT://local_host:9092" + endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == "local_host") + assert(endpoint.port == 9092) + assert(endpoint.connectionString == "PLAINTEXT://local_host:9092") // also test for default bind connectionString = "PLAINTEXT://:9092" endpoint = EndPoint.createEndPoint(connectionString)