From be5edd2f8d0c0355cb33feb2ac7482b7df7dccbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Szafra=C5=84ski?= Date: Thu, 18 Sep 2014 15:53:48 -0700 Subject: [PATCH] =?UTF-8?q?kafka-1123;=20Broker=20IPv6=20addresses=20parse?= =?UTF-8?q?d=20incorrectly;=20patched=20by=20Krzysztof=20Szafra=C5=84ski;?= =?UTF-8?q?=20reviewed=20by=20Jun=20Rao?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/common/utils/ClientUtils.java | 17 ++++--- .../org/apache/kafka/common/utils/Utils.java | 36 +++++++++++++ .../kafka/common/utils/ClientUtilsTest.java | 42 +++++++++++++++ .../apache/kafka/common/utils/UtilsTest.java | 51 +++++++++++++++++++ .../java/kafka/etl/impl/DataGenerator.java | 7 +-- .../main/scala/kafka/admin/TopicCommand.scala | 4 +- .../main/scala/kafka/api/TopicMetadata.scala | 4 +- .../main/scala/kafka/client/ClientUtils.scala | 14 ++--- .../src/main/scala/kafka/cluster/Broker.scala | 7 +-- .../scala/kafka/consumer/SimpleConsumer.scala | 3 +- .../scala/kafka/producer/SyncProducer.scala | 8 +-- core/src/main/scala/kafka/utils/Utils.scala | 16 ++---- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- .../scala/unit/kafka/utils/UtilsTest.scala | 9 ++-- .../unit/kafka/zk/EmbeddedZookeeper.scala | 5 +- 15 files changed, 176 insertions(+), 51 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java create mode 100644 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java index cb33e34978b..b987e7f0434 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java @@ -19,26 +19,31 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigException; +import static org.apache.kafka.common.utils.Utils.getHost; +import static org.apache.kafka.common.utils.Utils.getPort; + public class ClientUtils { + public static List parseAndValidateAddresses(List urls) { List addresses = new ArrayList(); for (String url : urls) { if (url != null && url.length() > 0) { - String[] pieces = url.split(":"); - if (pieces.length != 2) + String host = getHost(url); + Integer port = getPort(url); + if (host == null || port == null) throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); try { - InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); + InetSocketAddress address = new InetSocketAddress(host, port); if (address.isUnresolved()) - throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url); + throw new ConfigException("DNS resolution failed for url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); addresses.add(address); } catch (NumberFormatException e) { - throw new ConfigException("Invalid port in metadata.broker.list: " + url); + throw new ConfigException("Invalid port in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url); } } } if (addresses.size() < 1) - throw new ConfigException("No bootstrap urls given in metadata.broker.list."); + throw new ConfigException("No bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); return addresses; } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 50af60198a3..a0827f576e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -14,11 +14,15 @@ package org.apache.kafka.common.utils; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.kafka.common.KafkaException; public class Utils { + private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)"); + public static String NL = System.getProperty("line.separator"); /** @@ -217,4 +221,36 @@ public class Utils { return h; } + /** + * Extracts the hostname from a "host:port" address string. + * @param address address string to parse + * @return hostname or null if the given address is incorrect + */ + public static String getHost(String address) { + Matcher matcher = HOST_PORT_PATTERN.matcher(address); + return matcher.matches() ? matcher.group(1) : null; + } + + /** + * Extracts the port number from a "host:port" address string. + * @param address address string to parse + * @return port number or null if the given address is incorrect + */ + public static Integer getPort(String address) { + Matcher matcher = HOST_PORT_PATTERN.matcher(address); + return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null; + } + + /** + * Formats hostname and port number as a "host:port" address string, + * surrounding IPv6 addresses with braces '[', ']' + * @param host hostname + * @param port port number + * @return address string + */ + public static String formatAddress(String host, Integer port) { + return host.contains(":") + ? "[" + host + "]:" + port // IPv6 + : host + ":" + port; + } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java new file mode 100644 index 00000000000..6e37ea553f7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.Test; + +import java.util.Arrays; + +public class ClientUtilsTest { + + @Test + public void testParseAndValidateAddresses() { + check("127.0.0.1:8000"); + check("mydomain.com:8080"); + check("[::1]:8000"); + check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000"); + } + + @Test(expected = ConfigException.class) + public void testNoPort() { + check("127.0.0.1"); + } + + private void check(String... url) { + ClientUtils.parseAndValidateAddresses(Arrays.asList(url)); + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java new file mode 100644 index 00000000000..a39fab532f7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.junit.Test; + +import static org.apache.kafka.common.utils.Utils.getHost; +import static org.apache.kafka.common.utils.Utils.getPort; +import static org.apache.kafka.common.utils.Utils.formatAddress; +import static org.junit.Assert.*; + +public class UtilsTest { + + @Test + public void testGetHost() { + assertEquals("127.0.0.1", getHost("127.0.0.1:8000")); + assertEquals("mydomain.com", getHost("mydomain.com:8080")); + assertEquals("::1", getHost("[::1]:1234")); + assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678")); + } + + @Test + public void testGetPort() { + assertEquals(8000, getPort("127.0.0.1:8000").intValue()); + assertEquals(8080, getPort("mydomain.com:8080").intValue()); + assertEquals(1234, getPort("[::1]:1234").intValue()); + assertEquals(5678, getPort("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678").intValue()); + } + + @Test + public void testFormatAddress() { + assertEquals("127.0.0.1:8000", formatAddress("127.0.0.1", 8000)); + assertEquals("mydomain.com:8080", formatAddress("mydomain.com", 8080)); + assertEquals("[::1]:1234", formatAddress("::1", 1234)); + assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678)); + } +} \ No newline at end of file diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index f3fb3fd9986..d27a511fcdd 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -27,7 +27,6 @@ import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLRequest; import kafka.etl.Props; import kafka.javaapi.producer.Producer; -import kafka.message.Message; import kafka.producer.ProducerConfig; import kafka.producer.KeyedMessage; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +35,8 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapred.JobConf; +import static org.apache.kafka.common.utils.Utils.formatAddress; + /** * Use this class to produce test events to Kafka server. Each event contains a * random timestamp in text format. @@ -70,7 +71,7 @@ public class DataGenerator { System.out.println("server uri:" + _uri.toString()); Properties producerProps = new Properties(); - producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort())); + producerProps.put("metadata.broker.list", formatAddress(_uri.getHost(), _uri.getPort())); producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE)); producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); @@ -108,7 +109,7 @@ public class DataGenerator { if (fs.exists(outPath)) fs.delete(outPath); KafkaETLRequest request = - new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(), 0); + new KafkaETLRequest(_topic, "tcp://" + formatAddress(_uri.getHost(), _uri.getPort()), 0); System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString()); byte[] bytes = request.toString().getBytes("UTF-8"); diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index b3f2e82d1ed..3b2166aa4e2 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,7 +19,6 @@ package kafka.admin import joptsimple._ import java.util.Properties -import kafka.admin.AdminOperationException import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -29,6 +28,7 @@ import kafka.cluster.Broker import kafka.log.LogConfig import kafka.consumer.Whitelist import kafka.server.OffsetManager +import org.apache.kafka.common.utils.Utils.formatAddress object TopicCommand { @@ -193,7 +193,7 @@ object TopicCommand { } } - def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")" + def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")" def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = { val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*""")) diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 51380a6b0dd..0190076df0a 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -21,8 +21,8 @@ import kafka.cluster.Broker import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.utils.Logging -import collection.mutable.ArrayBuffer import kafka.common._ +import org.apache.kafka.common.utils.Utils._ object TopicMetadata { @@ -149,7 +149,7 @@ case class PartitionMetadata(partitionId: Int, partitionMetadataString.toString() } - private def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")" + private def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port) + ")" } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ce7ede3f6d6..ebba87f0566 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -28,6 +28,7 @@ import util.Random import kafka.utils.ZkUtils._ import org.I0Itec.zkclient.ZkClient import java.io.IOException +import org.apache.kafka.common.utils.Utils.{getHost, getPort} /** * Helper functions common to clients (producer, consumer, or admin) @@ -85,7 +86,7 @@ object ClientUtils extends Logging{ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int, correlationId: Int = 0): TopicMetadataResponse = { val props = new Properties() - props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(",")) + props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(",")) props.put("client.id", clientId) props.put("request.timeout.ms", timeoutMs.toString) val producerConfig = new ProducerConfig(props) @@ -98,14 +99,9 @@ object ClientUtils extends Logging{ def parseBrokerList(brokerListStr: String): Seq[Broker] = { val brokersStr = Utils.parseCsvList(brokerListStr) - brokersStr.zipWithIndex.map(b =>{ - val brokerStr = b._1 - val brokerId = b._2 - val brokerInfos = brokerStr.split(":") - val hostName = brokerInfos(0) - val port = brokerInfos(1).toInt - new Broker(brokerId, hostName, port) - }) + brokersStr.zipWithIndex.map { case (address, brokerId) => + new Broker(brokerId, getHost(address), getPort(address)) + } } /** diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index ccc3fc18184..0060add008b 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -22,6 +22,7 @@ import kafka.utils.Json import kafka.api.ApiUtils._ import java.nio.ByteBuffer import kafka.common.{KafkaException, BrokerNotAvailableException} +import org.apache.kafka.common.utils.Utils._ /** * A Kafka broker @@ -54,11 +55,11 @@ object Broker { } } -case class Broker(val id: Int, val host: String, val port: Int) { +case class Broker(id: Int, host: String, port: Int) { - override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port) + override def toString: String = "id:" + id + ",host:" + host + ",port:" + port - def getConnectionString(): String = host + ":" + port + def connectionString: String = formatAddress(host, port) def writeTo(buffer: ByteBuffer) { buffer.putInt(id) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 8db9203d164..d349a3000fe 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -21,6 +21,7 @@ import kafka.api._ import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} +import org.apache.kafka.common.utils.Utils._ /** * A consumer of kafka messages @@ -46,7 +47,7 @@ class SimpleConsumer(val host: String, } private def disconnect() = { - debug("Disconnecting from " + host + ":" + port) + debug("Disconnecting from " + formatAddress(host, port)) blockingChannel.disconnect() } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 489f0077512..42c95037509 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -22,6 +22,8 @@ import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} import kafka.utils._ import java.util.Random +import org.apache.kafka.common.utils.Utils._ + object SyncProducer { val RequestKey: Short = 0 val randomGenerator = new Random @@ -126,7 +128,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { */ private def disconnect() { try { - info("Disconnecting from " + config.host + ":" + config.port) + info("Disconnecting from " + formatAddress(config.host, config.port)) blockingChannel.disconnect() } catch { case e: Exception => error("Error on disconnect: ", e) @@ -137,11 +139,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { if (!blockingChannel.isConnected && !shutdown) { try { blockingChannel.connect() - info("Connected to " + config.host + ":" + config.port + " for producing") + info("Connected to " + formatAddress(config.host, config.port) + " for producing") } catch { case e: Exception => { disconnect() - error("Producer connection to " + config.host + ":" + config.port + " unsuccessful", e) + error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful", e) throw e } } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index da52b426dea..29d5a17d4a0 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -394,22 +394,14 @@ object Utils extends Logging { v } - /** - * Parse a host and port out of a string - */ - def parseHostPort(hostport: String) : (String, Int) = { - val splits = hostport.split(":") - (splits(0), splits(1).toInt) - } - /** * Get the stack trace from an exception as a string */ def stackTrace(e: Throwable): String = { - val sw = new StringWriter; - val pw = new PrintWriter(sw); - e.printStackTrace(pw); - sw.toString(); + val sw = new StringWriter + val pw = new PrintWriter(sw) + e.printStackTrace(pw) + sw.toString() } /** diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c4e13c5240c..2dbdd3c2f03 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -24,6 +24,8 @@ import java.nio.channels._ import java.util.Random import java.util.Properties +import org.apache.kafka.common.utils.Utils._ + import collection.mutable.Map import collection.mutable.ListBuffer @@ -142,7 +144,7 @@ object TestUtils extends Logging { } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { - configs.map(c => c.hostName + ":" + c.port).mkString(",") + configs.map(c => formatAddress(c.hostName, c.port)).mkString(",") } /** diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index a50234904c2..0d0f0e2fba3 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -20,7 +20,6 @@ package kafka.utils import java.util.Arrays import java.util.concurrent.locks.ReentrantLock import java.nio.ByteBuffer -import java.io._ import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ @@ -73,7 +72,7 @@ class UtilsTest extends JUnitSuite { assertEquals(1, Utils.abs(1)) assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE)) } - + @Test def testReplaceSuffix() { assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text")) @@ -81,7 +80,7 @@ class UtilsTest extends JUnitSuite { assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", "")) assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt")) } - + @Test def testReadInt() { val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue) @@ -90,7 +89,6 @@ class UtilsTest extends JUnitSuite { buffer.putInt(i*4, values(i)) assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array, i*4)) } - } @Test @@ -105,7 +103,7 @@ class UtilsTest extends JUnitSuite { assertTrue(emptyStringList.equals(emptyListFromNullString)) assertTrue(emptyStringList.equals(emptyList)) } - + @Test def testInLock() { val lock = new ReentrantLock() @@ -115,6 +113,5 @@ class UtilsTest extends JUnitSuite { } assertEquals(2, result) assertFalse("Should be unlocked", lock.isLocked) - } } diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 3021a8c08bd..31515615089 100644 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -18,20 +18,19 @@ package kafka.zk import org.apache.zookeeper.server.ZooKeeperServer -import org.apache.zookeeper.server.NIOServerCnxn import org.apache.zookeeper.server.NIOServerCnxnFactory import kafka.utils.TestUtils import java.net.InetSocketAddress import kafka.utils.Utils +import org.apache.kafka.common.utils.Utils.getPort class EmbeddedZookeeper(val connectString: String) { val snapshotDir = TestUtils.tempDir() val logDir = TestUtils.tempDir() val tickTime = 500 val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime) - val port = connectString.split(":")(1).toInt val factory = new NIOServerCnxnFactory() - factory.configure(new InetSocketAddress("127.0.0.1", port),0) + factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0) factory.startup(zookeeper) def shutdown() {