Browse Source

kafka-1123; Broker IPv6 addresses parsed incorrectly; patched by Krzysztof Szafrański; reviewed by Jun Rao

pull/34/head
Krzysztof Szafrański 10 years ago committed by Jun Rao
parent
commit
be5edd2f8d
  1. 17
      clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
  2. 36
      clients/src/main/java/org/apache/kafka/common/utils/Utils.java
  3. 42
      clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
  4. 51
      clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
  5. 7
      contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
  6. 4
      core/src/main/scala/kafka/admin/TopicCommand.scala
  7. 4
      core/src/main/scala/kafka/api/TopicMetadata.scala
  8. 14
      core/src/main/scala/kafka/client/ClientUtils.scala
  9. 7
      core/src/main/scala/kafka/cluster/Broker.scala
  10. 3
      core/src/main/scala/kafka/consumer/SimpleConsumer.scala
  11. 8
      core/src/main/scala/kafka/producer/SyncProducer.scala
  12. 16
      core/src/main/scala/kafka/utils/Utils.scala
  13. 4
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  14. 9
      core/src/test/scala/unit/kafka/utils/UtilsTest.scala
  15. 5
      core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala

17
clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java

@ -19,26 +19,31 @@ import java.util.List; @@ -19,26 +19,31 @@ import java.util.List;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
public class ClientUtils {
public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
for (String url : urls) {
if (url != null && url.length() > 0) {
String[] pieces = url.split(":");
if (pieces.length != 2)
String host = getHost(url);
Integer port = getPort(url);
if (host == null || port == null)
throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
try {
InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved())
throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
throw new ConfigException("DNS resolution failed for url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
addresses.add(address);
} catch (NumberFormatException e) {
throw new ConfigException("Invalid port in metadata.broker.list: " + url);
throw new ConfigException("Invalid port in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
}
}
}
if (addresses.size() < 1)
throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
throw new ConfigException("No bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
return addresses;
}
}

36
clients/src/main/java/org/apache/kafka/common/utils/Utils.java

@ -14,11 +14,15 @@ package org.apache.kafka.common.utils; @@ -14,11 +14,15 @@ package org.apache.kafka.common.utils;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.KafkaException;
public class Utils {
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)");
public static String NL = System.getProperty("line.separator");
/**
@ -217,4 +221,36 @@ public class Utils { @@ -217,4 +221,36 @@ public class Utils {
return h;
}
/**
* Extracts the hostname from a "host:port" address string.
* @param address address string to parse
* @return hostname or null if the given address is incorrect
*/
public static String getHost(String address) {
Matcher matcher = HOST_PORT_PATTERN.matcher(address);
return matcher.matches() ? matcher.group(1) : null;
}
/**
* Extracts the port number from a "host:port" address string.
* @param address address string to parse
* @return port number or null if the given address is incorrect
*/
public static Integer getPort(String address) {
Matcher matcher = HOST_PORT_PATTERN.matcher(address);
return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null;
}
/**
* Formats hostname and port number as a "host:port" address string,
* surrounding IPv6 addresses with braces '[', ']'
* @param host hostname
* @param port port number
* @return address string
*/
public static String formatAddress(String host, Integer port) {
return host.contains(":")
? "[" + host + "]:" + port // IPv6
: host + ":" + port;
}
}

42
clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java

@ -0,0 +1,42 @@ @@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import org.apache.kafka.common.config.ConfigException;
import org.junit.Test;
import java.util.Arrays;
public class ClientUtilsTest {
@Test
public void testParseAndValidateAddresses() {
check("127.0.0.1:8000");
check("mydomain.com:8080");
check("[::1]:8000");
check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
}
@Test(expected = ConfigException.class)
public void testNoPort() {
check("127.0.0.1");
}
private void check(String... url) {
ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
}
}

51
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

@ -0,0 +1,51 @@ @@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
import org.junit.Test;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.junit.Assert.*;
public class UtilsTest {
@Test
public void testGetHost() {
assertEquals("127.0.0.1", getHost("127.0.0.1:8000"));
assertEquals("mydomain.com", getHost("mydomain.com:8080"));
assertEquals("::1", getHost("[::1]:1234"));
assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
}
@Test
public void testGetPort() {
assertEquals(8000, getPort("127.0.0.1:8000").intValue());
assertEquals(8080, getPort("mydomain.com:8080").intValue());
assertEquals(1234, getPort("[::1]:1234").intValue());
assertEquals(5678, getPort("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678").intValue());
}
@Test
public void testFormatAddress() {
assertEquals("127.0.0.1:8000", formatAddress("127.0.0.1", 8000));
assertEquals("mydomain.com:8080", formatAddress("mydomain.com", 8080));
assertEquals("[::1]:1234", formatAddress("::1", 1234));
assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678));
}
}

7
contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java

@ -27,7 +27,6 @@ import kafka.etl.KafkaETLKey; @@ -27,7 +27,6 @@ import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.producer.Producer;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.producer.KeyedMessage;
import org.apache.hadoop.fs.FileSystem;
@ -36,6 +35,8 @@ import org.apache.hadoop.io.BytesWritable; @@ -36,6 +35,8 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
import static org.apache.kafka.common.utils.Utils.formatAddress;
/**
* Use this class to produce test events to Kafka server. Each event contains a
* random timestamp in text format.
@ -70,7 +71,7 @@ public class DataGenerator { @@ -70,7 +71,7 @@ public class DataGenerator {
System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties();
producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
producerProps.put("metadata.broker.list", formatAddress(_uri.getHost(), _uri.getPort()));
producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
@ -108,7 +109,7 @@ public class DataGenerator { @@ -108,7 +109,7 @@ public class DataGenerator {
if (fs.exists(outPath)) fs.delete(outPath);
KafkaETLRequest request =
new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(), 0);
new KafkaETLRequest(_topic, "tcp://" + formatAddress(_uri.getHost(), _uri.getPort()), 0);
System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
byte[] bytes = request.toString().getBytes("UTF-8");

4
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -19,7 +19,6 @@ package kafka.admin @@ -19,7 +19,6 @@ package kafka.admin
import joptsimple._
import java.util.Properties
import kafka.admin.AdminOperationException
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@ -29,6 +28,7 @@ import kafka.cluster.Broker @@ -29,6 +28,7 @@ import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.consumer.Whitelist
import kafka.server.OffsetManager
import org.apache.kafka.common.utils.Utils.formatAddress
object TopicCommand {
@ -193,7 +193,7 @@ object TopicCommand { @@ -193,7 +193,7 @@ object TopicCommand {
}
}
def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))

4
core/src/main/scala/kafka/api/TopicMetadata.scala

@ -21,8 +21,8 @@ import kafka.cluster.Broker @@ -21,8 +21,8 @@ import kafka.cluster.Broker
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.utils.Logging
import collection.mutable.ArrayBuffer
import kafka.common._
import org.apache.kafka.common.utils.Utils._
object TopicMetadata {
@ -149,7 +149,7 @@ case class PartitionMetadata(partitionId: Int, @@ -149,7 +149,7 @@ case class PartitionMetadata(partitionId: Int,
partitionMetadataString.toString()
}
private def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")"
private def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")"
}

14
core/src/main/scala/kafka/client/ClientUtils.scala

@ -28,6 +28,7 @@ import util.Random @@ -28,6 +28,7 @@ import util.Random
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.ZkClient
import java.io.IOException
import org.apache.kafka.common.utils.Utils.{getHost, getPort}
/**
* Helper functions common to clients (producer, consumer, or admin)
@ -85,7 +86,7 @@ object ClientUtils extends Logging{ @@ -85,7 +86,7 @@ object ClientUtils extends Logging{
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int,
correlationId: Int = 0): TopicMetadataResponse = {
val props = new Properties()
props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(","))
props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
props.put("client.id", clientId)
props.put("request.timeout.ms", timeoutMs.toString)
val producerConfig = new ProducerConfig(props)
@ -98,14 +99,9 @@ object ClientUtils extends Logging{ @@ -98,14 +99,9 @@ object ClientUtils extends Logging{
def parseBrokerList(brokerListStr: String): Seq[Broker] = {
val brokersStr = Utils.parseCsvList(brokerListStr)
brokersStr.zipWithIndex.map(b =>{
val brokerStr = b._1
val brokerId = b._2
val brokerInfos = brokerStr.split(":")
val hostName = brokerInfos(0)
val port = brokerInfos(1).toInt
new Broker(brokerId, hostName, port)
})
brokersStr.zipWithIndex.map { case (address, brokerId) =>
new Broker(brokerId, getHost(address), getPort(address))
}
}
/**

7
core/src/main/scala/kafka/cluster/Broker.scala

@ -22,6 +22,7 @@ import kafka.utils.Json @@ -22,6 +22,7 @@ import kafka.utils.Json
import kafka.api.ApiUtils._
import java.nio.ByteBuffer
import kafka.common.{KafkaException, BrokerNotAvailableException}
import org.apache.kafka.common.utils.Utils._
/**
* A Kafka broker
@ -54,11 +55,11 @@ object Broker { @@ -54,11 +55,11 @@ object Broker {
}
}
case class Broker(val id: Int, val host: String, val port: Int) {
case class Broker(id: Int, host: String, port: Int) {
override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
override def toString: String = "id:" + id + ",host:" + host + ",port:" + port
def getConnectionString(): String = host + ":" + port
def connectionString: String = formatAddress(host, port)
def writeTo(buffer: ByteBuffer) {
buffer.putInt(id)

3
core/src/main/scala/kafka/consumer/SimpleConsumer.scala

@ -21,6 +21,7 @@ import kafka.api._ @@ -21,6 +21,7 @@ import kafka.api._
import kafka.network._
import kafka.utils._
import kafka.common.{ErrorMapping, TopicAndPartition}
import org.apache.kafka.common.utils.Utils._
/**
* A consumer of kafka messages
@ -46,7 +47,7 @@ class SimpleConsumer(val host: String, @@ -46,7 +47,7 @@ class SimpleConsumer(val host: String,
}
private def disconnect() = {
debug("Disconnecting from " + host + ":" + port)
debug("Disconnecting from " + formatAddress(host, port))
blockingChannel.disconnect()
}

8
core/src/main/scala/kafka/producer/SyncProducer.scala

@ -22,6 +22,8 @@ import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} @@ -22,6 +22,8 @@ import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
import kafka.utils._
import java.util.Random
import org.apache.kafka.common.utils.Utils._
object SyncProducer {
val RequestKey: Short = 0
val randomGenerator = new Random
@ -126,7 +128,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { @@ -126,7 +128,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
*/
private def disconnect() {
try {
info("Disconnecting from " + config.host + ":" + config.port)
info("Disconnecting from " + formatAddress(config.host, config.port))
blockingChannel.disconnect()
} catch {
case e: Exception => error("Error on disconnect: ", e)
@ -137,11 +139,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { @@ -137,11 +139,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
if (!blockingChannel.isConnected && !shutdown) {
try {
blockingChannel.connect()
info("Connected to " + config.host + ":" + config.port + " for producing")
info("Connected to " + formatAddress(config.host, config.port) + " for producing")
} catch {
case e: Exception => {
disconnect()
error("Producer connection to " + config.host + ":" + config.port + " unsuccessful", e)
error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful", e)
throw e
}
}

16
core/src/main/scala/kafka/utils/Utils.scala

@ -394,22 +394,14 @@ object Utils extends Logging { @@ -394,22 +394,14 @@ object Utils extends Logging {
v
}
/**
* Parse a host and port out of a string
*/
def parseHostPort(hostport: String) : (String, Int) = {
val splits = hostport.split(":")
(splits(0), splits(1).toInt)
}
/**
* Get the stack trace from an exception as a string
*/
def stackTrace(e: Throwable): String = {
val sw = new StringWriter;
val pw = new PrintWriter(sw);
e.printStackTrace(pw);
sw.toString();
val sw = new StringWriter
val pw = new PrintWriter(sw)
e.printStackTrace(pw)
sw.toString()
}
/**

4
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -24,6 +24,8 @@ import java.nio.channels._ @@ -24,6 +24,8 @@ import java.nio.channels._
import java.util.Random
import java.util.Properties
import org.apache.kafka.common.utils.Utils._
import collection.mutable.Map
import collection.mutable.ListBuffer
@ -142,7 +144,7 @@ object TestUtils extends Logging { @@ -142,7 +144,7 @@ object TestUtils extends Logging {
}
def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
configs.map(c => c.hostName + ":" + c.port).mkString(",")
configs.map(c => formatAddress(c.hostName, c.port)).mkString(",")
}
/**

9
core/src/test/scala/unit/kafka/utils/UtilsTest.scala

@ -20,7 +20,6 @@ package kafka.utils @@ -20,7 +20,6 @@ package kafka.utils
import java.util.Arrays
import java.util.concurrent.locks.ReentrantLock
import java.nio.ByteBuffer
import java.io._
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Assert._
@ -73,7 +72,7 @@ class UtilsTest extends JUnitSuite { @@ -73,7 +72,7 @@ class UtilsTest extends JUnitSuite {
assertEquals(1, Utils.abs(1))
assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
}
@Test
def testReplaceSuffix() {
assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
@ -81,7 +80,7 @@ class UtilsTest extends JUnitSuite { @@ -81,7 +80,7 @@ class UtilsTest extends JUnitSuite {
assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", ""))
assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"))
}
@Test
def testReadInt() {
val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
@ -90,7 +89,6 @@ class UtilsTest extends JUnitSuite { @@ -90,7 +89,6 @@ class UtilsTest extends JUnitSuite {
buffer.putInt(i*4, values(i))
assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array, i*4))
}
}
@Test
@ -105,7 +103,7 @@ class UtilsTest extends JUnitSuite { @@ -105,7 +103,7 @@ class UtilsTest extends JUnitSuite {
assertTrue(emptyStringList.equals(emptyListFromNullString))
assertTrue(emptyStringList.equals(emptyList))
}
@Test
def testInLock() {
val lock = new ReentrantLock()
@ -115,6 +113,5 @@ class UtilsTest extends JUnitSuite { @@ -115,6 +113,5 @@ class UtilsTest extends JUnitSuite {
}
assertEquals(2, result)
assertFalse("Should be unlocked", lock.isLocked)
}
}

5
core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala

@ -18,20 +18,19 @@ @@ -18,20 +18,19 @@
package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxn
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.TestUtils
import java.net.InetSocketAddress
import kafka.utils.Utils
import org.apache.kafka.common.utils.Utils.getPort
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val port = connectString.split(":")(1).toInt
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress("127.0.0.1", port),0)
factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
factory.startup(zookeeper)
def shutdown() {

Loading…
Cancel
Save