Browse Source

KAFKA-3645; Fix ConsumerGroupCommand and ConsumerOffsetChecker to correctly read endpoint info from ZK

The host and port entries under /brokers/ids/<bid> gets filled only for PLAINTEXT security protocol. For other protocols the host is null and the actual endpoint is under "endpoints". This causes NPE when running the consumer group and offset checker scripts in a kerberized env. By always reading the host and port values from the "endpoint", a more meaningful exception would be thrown rather than a NPE.

Author: Arun Mahadevan <aiyer@hortonworks.com>

Reviewers: Sriharsha Chintalapani <schintalapani@hortonworks.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1301 from arunmahadevan/cg_kerb_fix
pull/1301/merge
Arun Mahadevan 9 years ago committed by Ismael Juma
parent
commit
ff300c9d4f
  1. 20
      core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
  2. 23
      core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala

20
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

@ -30,7 +30,7 @@ import org.apache.kafka.clients.CommonClientConfigs @@ -30,7 +30,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.BrokerNotAvailableException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils
@ -277,20 +277,10 @@ object ConsumerGroupCommand { @@ -277,20 +277,10 @@ object ConsumerGroupCommand {
private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
try {
zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
case Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
zkUtils.getBrokerInfo(brokerId)
.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
.map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerGroupCommand"))
.orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)))
} catch {
case t: Throwable =>
println("Could not parse broker info due to " + t.getMessage)

23
core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala

@ -21,11 +21,12 @@ package kafka.tools @@ -21,11 +21,12 @@ package kafka.tools
import joptsimple._
import kafka.utils._
import kafka.consumer.SimpleConsumer
import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest}
import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest}
import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
import org.apache.kafka.common.errors.BrokerNotAvailableException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.security.JaasUtils
import scala.collection._
import kafka.client.ClientUtils
import kafka.network.BlockingChannel
@ -40,20 +41,10 @@ object ConsumerOffsetChecker extends Logging { @@ -40,20 +41,10 @@ object ConsumerOffsetChecker extends Logging {
private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = {
try {
zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
case Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker"))
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
}
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
}
zkUtils.getBrokerInfo(bid)
.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
.map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerOffsetChecker"))
.orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)))
} catch {
case t: Throwable =>
println("Could not parse broker info due to " + t.getCause)

Loading…
Cancel
Save