Browse Source

commit offset before consumer shutdown KAFKA-84; rename util.StringSerializer to ZKStringSerializer to avoid confusion with producer.StringSerializer

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1154719 13f79535-47bb-0310-9956-ffa450edef68
0.7.0
Jun Rao 13 years ago
parent
commit
96b8e03dd1
  1. 4
      core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
  2. 28
      core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
  3. 4
      core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
  4. 2
      core/src/main/scala/kafka/server/KafkaZooKeeper.scala
  5. 4
      core/src/main/scala/kafka/tools/ReplayLogProducer.scala
  6. 2
      core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
  7. 2
      core/src/main/scala/kafka/utils/ZkUtils.scala
  8. 4
      core/src/test/scala/other/kafka/DeleteZKPath.scala
  9. 4
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  10. 4
      core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
  11. 6
      core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala

4
core/src/main/scala/kafka/consumer/ConsoleConsumer.scala

@ -28,7 +28,7 @@ import java.io.PrintStream @@ -28,7 +28,7 @@ import java.io.PrintStream
import kafka.message._
import kafka.utils.Utils
import kafka.utils.ZkUtils
import kafka.utils.StringSerializer
import kafka.utils.ZKStringSerializer
/**
* Consumer that dumps messages out to standard out.
@ -200,7 +200,7 @@ object ConsoleConsumer { @@ -200,7 +200,7 @@ object ConsoleConsumer {
try {
val dir = "/consumers/" + groupId
logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
zk.close()
} catch {

28
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

@ -92,8 +92,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -92,8 +92,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// queues : (topic,consumerThreadId) -> queue
private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
connectZk
createFetcher
connectZk()
createFetcher()
if (config.autoCommit) {
logger.info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def connectZk() {
logger.info("Connecting to zookeeper instance at " + config.zkConnect)
zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
}
def shutdown() {
@ -120,12 +120,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -120,12 +120,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (canShutdown) {
logger.info("ZKConsumerConnector shutting down")
try {
scheduler.shutdown
scheduler.shutdown()
fetcher match {
case Some(f) => f.shutdown
case Some(f) => f.shutdown()
case None =>
}
sendShudownToAllQueues
sendShudownToAllQueues()
if (config.autoCommit)
commitOffsets()
if (zkClient != null) {
zkClient.close()
zkClient = null
@ -186,7 +188,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -186,7 +188,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener))
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance
loadBalancerListener.syncedRebalance()
ret
}
@ -199,7 +201,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -199,7 +201,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def sendShudownToAllQueues() = {
for (queue <- queues.values) {
logger.debug("Clearing up queue")
queue.clear
queue.clear()
queue.put(ZookeeperConsumerConnector.shutdownCommand)
logger.debug("Cleared queue and sent shutdown command")
}
@ -209,7 +211,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -209,7 +211,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if(logger.isTraceEnabled)
logger.trace("auto committing")
try {
commitOffsets
commitOffsets()
}
catch {
case t: Throwable =>
@ -419,7 +421,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -419,7 +421,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
logger.info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
try {
done = rebalance
done = rebalance()
}
catch {
case e =>
@ -432,8 +434,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -432,8 +434,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (done)
return
// release all partitions, reset state and retry
releasePartitionOwnership
resetState
releasePartitionOwnership()
resetState()
Thread.sleep(config.zkSyncTimeMs)
}
}
@ -462,7 +464,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -462,7 +464,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
commitOffsets
logger.info("Releasing partition ownership")
releasePartitionOwnership
releasePartitionOwnership()
val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]]
for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {

4
core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
*/
package kafka.producer
import kafka.utils.{StringSerializer, ZkUtils, ZKConfig}
import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
import collection.mutable.HashMap
import collection.mutable.Map
import org.apache.log4j.Logger
@ -59,7 +59,7 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In @@ -59,7 +59,7 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
private val logger = Logger.getLogger(classOf[ZKBrokerPartitionInfo])
private val zkWatcherLock = new Object
private val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
StringSerializer)
ZKStringSerializer)
// maintain a map from topic -> list of (broker, num_partitions) from zookeeper
private var topicBrokerPartitions = getZKTopicPartitionInfo
// maintain a map from broker id to the corresponding Broker object

2
core/src/main/scala/kafka/server/KafkaZooKeeper.scala

@ -42,7 +42,7 @@ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) { @@ -42,7 +42,7 @@ class KafkaZooKeeper(config: KafkaConfig, logManager: LogManager) {
def startup() {
/* start client */
logger.info("connecting to ZK: " + config.zkConnect)
zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
zkClient.subscribeStateChanges(new SessionExpireListener)
}

4
core/src/main/scala/kafka/tools/ReplayLogProducer.scala

@ -9,7 +9,7 @@ import kafka.producer.async.DefaultEventHandler @@ -9,7 +9,7 @@ import kafka.producer.async.DefaultEventHandler
import kafka.serializer.{DefaultEncoder, StringEncoder}
import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
import kafka.consumer._
import kafka.utils.{StringSerializer, Utils}
import kafka.utils.{ZKStringSerializer, Utils}
import kafka.api.OffsetRequest
import org.I0Itec.zkclient._
import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
@ -131,7 +131,7 @@ object ReplayLogProducer { @@ -131,7 +131,7 @@ object ReplayLogProducer {
try {
val dir = "/consumers/" + groupId
logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, StringSerializer)
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
zk.close()
} catch {

2
core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala

@ -33,7 +33,7 @@ object UpdateOffsetsInZK { @@ -33,7 +33,7 @@ object UpdateOffsetsInZK {
usage
val config = new ConsumerConfig(Utils.loadProps(args(1)))
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs, StringSerializer)
config.zkConnectionTimeoutMs, ZKStringSerializer)
args(0) match {
case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))

2
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -239,7 +239,7 @@ object ZkUtils { @@ -239,7 +239,7 @@ object ZkUtils {
}
}
object StringSerializer extends ZkSerializer {
object ZKStringSerializer extends ZkSerializer {
@throws(classOf[ZkMarshallingError])
def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")

4
core/src/test/scala/other/kafka/DeleteZKPath.scala

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package kafka
import consumer.ConsumerConfig
import utils.{StringSerializer, ZkUtils, Utils}
import utils.{ZKStringSerializer, ZkUtils, Utils}
import org.I0Itec.zkclient.ZkClient
object DeleteZKPath {
@ -31,7 +31,7 @@ object DeleteZKPath { @@ -31,7 +31,7 @@ object DeleteZKPath {
val zkPath = args(1)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
StringSerializer)
ZKStringSerializer)
try {
ZkUtils.deletePathRecursive(zkClient, zkPath);

4
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -209,7 +209,7 @@ object TestUtils { @@ -209,7 +209,7 @@ object TestUtils {
* Throw an exception if an iterable has different length than expected
*
*/
def checkLength[T](s1: Iterator[T], expectedLength:Integer) {
def checkLength[T](s1: Iterator[T], expectedLength:Int) {
var n = 0
while (s1.hasNext) {
n+=1
@ -283,7 +283,7 @@ object TestUtils { @@ -283,7 +283,7 @@ object TestUtils {
}
def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
}

4
core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala

@ -21,7 +21,7 @@ import org.apache.zookeeper.server.NIOServerCnxn @@ -21,7 +21,7 @@ import org.apache.zookeeper.server.NIOServerCnxn
import kafka.utils.TestUtils
import org.I0Itec.zkclient.ZkClient
import java.net.InetSocketAddress
import kafka.utils.{Utils, StringSerializer}
import kafka.utils.{Utils, ZKStringSerializer}
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
@ -31,7 +31,7 @@ class EmbeddedZookeeper(val connectString: String) { @@ -31,7 +31,7 @@ class EmbeddedZookeeper(val connectString: String) {
val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
factory.startup(zookeeper)
val client = new ZkClient(connectString)
client.setZkSerializer(StringSerializer)
client.setZkSerializer(ZKStringSerializer)
def shutdown() {
factory.shutdown()

6
core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala

@ -18,7 +18,7 @@ package kafka.zk @@ -18,7 +18,7 @@ package kafka.zk
import kafka.consumer.ConsumerConfig
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZkUtils, StringSerializer}
import kafka.utils.{ZkUtils, ZKStringSerializer}
import kafka.utils.{TestZKUtils, TestUtils}
import org.junit.Assert
import org.scalatest.junit.JUnit3Suite
@ -30,7 +30,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -30,7 +30,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
def testEphemeralNodeCleanup = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
StringSerializer)
ZKStringSerializer)
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
@ -48,7 +48,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -48,7 +48,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
Thread.sleep(zkSessionTimeoutMs)
zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
StringSerializer)
ZKStringSerializer)
val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
Assert.assertFalse(nodeExists)

Loading…
Cancel
Save