Browse Source

KAFKA-1848; check consumer shutting down flag inside retry loop; reviewed by Guozhang Wang

pull/38/merge
Aditya Auradkar 10 years ago committed by Guozhang Wang
parent
commit
a0852d477e
  1. 57
      core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

57
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

@ -613,36 +613,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @@ -613,36 +613,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def syncedRebalance() {
rebalanceLock synchronized {
rebalanceTimer.time {
if(isShuttingDown.get()) {
return
} else {
for (i <- 0 until config.rebalanceMaxRetries) {
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
var cluster: Cluster = null
try {
cluster = getCluster(zkClient)
done = rebalance(cluster)
} catch {
case e: Throwable =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get
* the value of a child. Just let this go since another rebalance will be triggered.
**/
info("exception during rebalance ", e)
}
info("end rebalancing consumer " + consumerIdString + " try #" + i)
if (done) {
return
} else {
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
}
// stop all fetchers and clear all the queues to avoid data duplication
closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
Thread.sleep(config.rebalanceBackoffMs)
for (i <- 0 until config.rebalanceMaxRetries) {
if(isShuttingDown.get()) {
return
}
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
var cluster: Cluster = null
try {
cluster = getCluster(zkClient)
done = rebalance(cluster)
} catch {
case e: Throwable =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get
* the value of a child. Just let this go since another rebalance will be triggered.
**/
info("exception during rebalance ", e)
}
info("end rebalancing consumer " + consumerIdString + " try #" + i)
if (done) {
return
} else {
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
}
// stop all fetchers and clear all the queues to avoid data duplication
closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
Thread.sleep(config.rebalanceBackoffMs)
}
}
}

Loading…
Cancel
Save