diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 5beb6c67d41..8bec3c8db66 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -28,7 +28,7 @@ import java.io.PrintStream import kafka.message._ import kafka.utils.Utils import kafka.utils.ZkUtils -import kafka.utils.StringSerializer +import kafka.utils.ZKStringSerializer /** * Consumer that dumps messages out to standard out. @@ -200,7 +200,7 @@ object ConsoleConsumer { try { val dir = "/consumers/" + groupId logger.info("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer) + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) zk.deleteRecursive(dir) zk.close() } catch { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index a7fd85566e3..56fdbb359f1 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -92,8 +92,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // queues : (topic,consumerThreadId) -> queue private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false) - connectZk - createFetcher + connectZk() + createFetcher() if (config.autoCommit) { logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms") scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs) @@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def connectZk() { logger.info("Connecting to zookeeper instance at " + config.zkConnect) - zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer) + zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) } def shutdown() { @@ -120,12 +120,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (canShutdown) { logger.info("ZKConsumerConnector shutting down") try { - scheduler.shutdown + scheduler.shutdown() fetcher match { - case Some(f) => f.shutdown + case Some(f) => f.shutdown() case None => } - sendShudownToAllQueues + sendShudownToAllQueues() + if (config.autoCommit) + commitOffsets() if (zkClient != null) { zkClient.close() zkClient = null @@ -186,7 +188,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener)) // explicitly trigger load balancing for this consumer - loadBalancerListener.syncedRebalance + loadBalancerListener.syncedRebalance() ret } @@ -199,7 +201,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def sendShudownToAllQueues() = { for (queue <- queues.values) { logger.debug("Clearing up queue") - queue.clear + queue.clear() queue.put(ZookeeperConsumerConnector.shutdownCommand) logger.debug("Cleared queue and sent shutdown command") } @@ -209,7 +211,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if(logger.isTraceEnabled) logger.trace("auto committing") try { - commitOffsets + commitOffsets() } catch { case t: Throwable => @@ -419,7 +421,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false try { - done = rebalance + done = rebalance() } catch { case e => @@ -432,8 +434,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (done) return // release all partitions, reset state and retry - releasePartitionOwnership - resetState + releasePartitionOwnership() + resetState() Thread.sleep(config.zkSyncTimeMs) } } @@ -462,7 +464,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, commitOffsets logger.info("Releasing partition ownership") - releasePartitionOwnership + releasePartitionOwnership() val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]] for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) { diff --git a/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala index 6041581cde3..a652ab6b075 100644 --- a/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala @@ -15,7 +15,7 @@ */ package kafka.producer -import kafka.utils.{StringSerializer, ZkUtils, ZKConfig} +import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig} import collection.mutable.HashMap import collection.mutable.Map import org.apache.log4j.Logger @@ -59,7 +59,7 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In private val logger = Logger.getLogger(classOf[ZKBrokerPartitionInfo]) private val zkWatcherLock = new Object private val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - StringSerializer) + ZKStringSerializer) // maintain a map from topic -> list of (broker, num_partitions) from zookeeper private var topicBrokerPartitions = getZKTopicPartitionInfo // maintain a map from broker id to the corresponding Broker object diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 2f08f52f740..90c3db12217 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -42,7 +42,7 @@ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) { def startup() { /* start client */ logger.info("connecting to ZK: " + config.zkConnect) - zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer) + zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) zkClient.subscribeStateChanges(new SessionExpireListener) } diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 992141d873d..eb965748dbc 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -9,7 +9,7 @@ import kafka.producer.async.DefaultEventHandler import kafka.serializer.{DefaultEncoder, StringEncoder} import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer} import kafka.consumer._ -import kafka.utils.{StringSerializer, Utils} +import kafka.utils.{ZKStringSerializer, Utils} import kafka.api.OffsetRequest import org.I0Itec.zkclient._ import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet} @@ -131,7 +131,7 @@ object ReplayLogProducer { try { val dir = "/consumers/" + groupId logger.info("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer) + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) zk.deleteRecursive(dir) zk.close() } catch { diff --git a/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala index c83475de580..779fb5b10f5 100644 --- a/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala @@ -33,7 +33,7 @@ object UpdateOffsetsInZK { usage val config = new ConsumerConfig(Utils.loadProps(args(1))) val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, StringSerializer) + config.zkConnectionTimeoutMs, ZKStringSerializer) args(0) match { case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2)) case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2)) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 3a86175a43e..fbd3fb30846 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -239,7 +239,7 @@ object ZkUtils { } } -object StringSerializer extends ZkSerializer { +object ZKStringSerializer extends ZkSerializer { @throws(classOf[ZkMarshallingError]) def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8") diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala index c237cc983a9..c9fdba57da1 100644 --- a/core/src/test/scala/other/kafka/DeleteZKPath.scala +++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala @@ -17,7 +17,7 @@ package kafka import consumer.ConsumerConfig -import utils.{StringSerializer, ZkUtils, Utils} +import utils.{ZKStringSerializer, ZkUtils, Utils} import org.I0Itec.zkclient.ZkClient object DeleteZKPath { @@ -31,7 +31,7 @@ object DeleteZKPath { val zkPath = args(1) val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - StringSerializer) + ZKStringSerializer) try { ZkUtils.deletePathRecursive(zkClient, zkPath); diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index b7dfb409d2e..24242a77591 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -209,7 +209,7 @@ object TestUtils { * Throw an exception if an iterable has different length than expected * */ - def checkLength[T](s1: Iterator[T], expectedLength:Integer) { + def checkLength[T](s1: Iterator[T], expectedLength:Int) { var n = 0 while (s1.hasNext) { n+=1 @@ -283,7 +283,7 @@ object TestUtils { } def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = { - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer) + val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.updatePersistentPath(zkClient, path, offset.toString) } diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 9f06840bc72..e5cc6ec299f 100644 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -21,7 +21,7 @@ import org.apache.zookeeper.server.NIOServerCnxn import kafka.utils.TestUtils import org.I0Itec.zkclient.ZkClient import java.net.InetSocketAddress -import kafka.utils.{Utils, StringSerializer} +import kafka.utils.{Utils, ZKStringSerializer} class EmbeddedZookeeper(val connectString: String) { val snapshotDir = TestUtils.tempDir() @@ -31,7 +31,7 @@ class EmbeddedZookeeper(val connectString: String) { val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port)) factory.startup(zookeeper) val client = new ZkClient(connectString) - client.setZkSerializer(StringSerializer) + client.setZkSerializer(ZKStringSerializer) def shutdown() { factory.shutdown() diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 850504b976b..f04f6ebde36 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -18,7 +18,7 @@ package kafka.zk import kafka.consumer.ConsumerConfig import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZkUtils, StringSerializer} +import kafka.utils.{ZkUtils, ZKStringSerializer} import kafka.utils.{TestZKUtils, TestUtils} import org.junit.Assert import org.scalatest.junit.JUnit3Suite @@ -30,7 +30,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { def testEphemeralNodeCleanup = { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - StringSerializer) + ZKStringSerializer) try { ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created") @@ -48,7 +48,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { Thread.sleep(zkSessionTimeoutMs) zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - StringSerializer) + ZKStringSerializer) val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest") Assert.assertFalse(nodeExists)