From a0852d477ec097bd4efe120c21de020fccf6dc49 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 21 Jan 2015 17:25:54 -0800 Subject: [PATCH] KAFKA-1848; check consumer shutting down flag inside retry loop; reviewed by Guozhang Wang --- .../consumer/ZookeeperConsumerConnector.scala | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 191a8677444..5487259751e 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -613,36 +613,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def syncedRebalance() { rebalanceLock synchronized { rebalanceTimer.time { - if(isShuttingDown.get()) { - return - } else { - for (i <- 0 until config.rebalanceMaxRetries) { - info("begin rebalancing consumer " + consumerIdString + " try #" + i) - var done = false - var cluster: Cluster = null - try { - cluster = getCluster(zkClient) - done = rebalance(cluster) - } catch { - case e: Throwable => - /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. - * For example, a ZK node can disappear between the time we get all children and the time we try to get - * the value of a child. Just let this go since another rebalance will be triggered. - **/ - info("exception during rebalance ", e) - } - info("end rebalancing consumer " + consumerIdString + " try #" + i) - if (done) { - return - } else { - /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should - * clear the cache */ - info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") - } - // stop all fetchers and clear all the queues to avoid data duplication - closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) - Thread.sleep(config.rebalanceBackoffMs) + for (i <- 0 until config.rebalanceMaxRetries) { + if(isShuttingDown.get()) { + return + } + info("begin rebalancing consumer " + consumerIdString + " try #" + i) + var done = false + var cluster: Cluster = null + try { + cluster = getCluster(zkClient) + done = rebalance(cluster) + } catch { + case e: Throwable => + /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. + * For example, a ZK node can disappear between the time we get all children and the time we try to get + * the value of a child. Just let this go since another rebalance will be triggered. + **/ + info("exception during rebalance ", e) + } + info("end rebalancing consumer " + consumerIdString + " try #" + i) + if (done) { + return + } else { + /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should + * clear the cache */ + info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") } + // stop all fetchers and clear all the queues to avoid data duplication + closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) + Thread.sleep(config.rebalanceBackoffMs) } } }