Browse Source

KAFKA-6546: Use LISTENER_NOT_FOUND_ON_LEADER error for missing listener (#5189)

For metadata request version 6 and above, use a different error code to indicate missing listener on leader broker to enable diagnosis of listener configuration issues.

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/5263/head
Rajini Sivaram 7 years ago committed by GitHub
parent
commit
e5ec3f55c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
  2. 38
      clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java
  3. 10
      clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  4. 11
      core/src/main/scala/kafka/server/KafkaApis.scala
  5. 21
      core/src/main/scala/kafka/server/MetadataCache.scala
  6. 37
      core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
  7. 50
      core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala

16
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@ -18,6 +18,7 @@ package org.apache.kafka.clients; @@ -18,6 +18,7 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
@ -52,6 +53,7 @@ import java.util.LinkedList; @@ -52,6 +53,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
@ -945,6 +947,20 @@ public class NetworkClient implements KafkaClient { @@ -945,6 +947,20 @@ public class NetworkClient implements KafkaClient {
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
this.metadataFetchInProgress = false;
Cluster cluster = response.cluster();
// If any partition has leader with missing listeners, log a few for diagnosing broker configuration
// issues. This could be a transient issue if listeners were added dynamically to brokers.
List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
topicMetadata.partitionMetadata().stream()
.filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
.map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
.collect(Collectors.toList());
if (!missingListenerPartitions.isEmpty()) {
int count = missingListenerPartitions.size();
log.warn("{} partitions have leader brokers without a matching listener, including {}",
count, missingListenerPartitions.subList(0, Math.min(10, count)));
}
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())

38
clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* The leader does not have an endpoint corresponding to the listener on which metadata was requested.
* This could indicate a broker configuration error or a transient error when listeners are updated
* dynamically and client requests are processed before all brokers have updated their listeners.
* This is currently used only for missing listeners on leader brokers, but may be used for followers
* in future.
*/
public class ListenerNotFoundException extends InvalidMetadataException {
private static final long serialVersionUID = 1L;
public ListenerNotFoundException(String message) {
super(message);
}
public ListenerNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

10
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DelegationTokenDisabledException; @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DelegationTokenDisabledException;
import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
import org.apache.kafka.common.errors.ListenerNotFoundException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@ -624,7 +625,14 @@ public enum Errors { @@ -624,7 +625,14 @@ public enum Errors {
public ApiException build(String message) {
return new InvalidFetchSessionEpochException(message);
}
});
}),
LISTENER_NOT_FOUND(72, "There is no listener on the leader broker that matches the listener on which metadata request was processed",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new ListenerNotFoundException(message);
}
}),;
private interface ApiExceptionBuilder {
ApiException build(String message);

11
core/src/main/scala/kafka/server/KafkaApis.scala

@ -991,8 +991,10 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -991,8 +991,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints)
errorUnavailableEndpoints: Boolean,
errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = {
val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
} else {
@ -1068,12 +1070,15 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1068,12 +1070,15 @@ class KafkaApis(val requestChannel: RequestChannel,
// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
val errorUnavailableEndpoints = requestVersion == 0
// In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader.
// From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors.
val errorUnavailableListeners = requestVersion >= 6
val topicMetadata =
if (authorizedTopics.isEmpty)
Seq.empty[MetadataResponse.TopicMetadata]
else
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
errorUnavailableEndpoints)
errorUnavailableEndpoints, errorUnavailableListeners)
val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata

21
core/src/main/scala/kafka/server/MetadataCache.scala

@ -65,19 +65,29 @@ class MetadataCache(brokerId: Int) extends Logging { @@ -65,19 +65,29 @@ class MetadataCache(brokerId: Int) extends Logging {
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
// If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
// Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
cache.get(topic).map { partitions =>
partitions.map { case (partitionId, partitionState) =>
val topicPartition = TopicAndPartition(topic, partitionId)
val maybeLeader = getAliveEndpoint(partitionState.basePartitionState.leader, listenerName)
val leaderBrokerId = partitionState.basePartitionState.leader
val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName)
val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt)
val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints)
val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
maybeLeader match {
case None =>
val error = if (!aliveBrokers.contains(brokerId)) { // we are already holding the read lock
debug(s"Error while fetching metadata for $topicPartition: leader not available")
new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(),
Errors.LEADER_NOT_AVAILABLE
} else {
debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId")
if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
}
new MetadataResponse.PartitionMetadata(error, partitionId, Node.noNode(),
replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava)
case Some(leader) =>
@ -112,10 +122,11 @@ class MetadataCache(brokerId: Int) extends Logging { @@ -112,10 +122,11 @@ class MetadataCache(brokerId: Int) extends Logging {
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false,
errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
inReadLock(partitionMetadataLock) {
topics.toSeq.flatMap { topic =>
getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata =>
getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
}
}

37
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

@ -817,9 +817,22 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @@ -817,9 +817,22 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
//verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
// Verify that a listener added to a subset of servers doesn't cause any issues
// when metadata is processed by the client.
addListener(servers.tail, "SCRAM_LISTENER", SecurityProtocol.SASL_PLAINTEXT, Seq("SCRAM-SHA-256"))
val bootstrap = TestUtils.bootstrapServers(servers.tail, new ListenerName("SCRAM_LISTENER"))
val producer = ProducerBuilder().bootstrapServers(bootstrap)
.securityProtocol(SecurityProtocol.SASL_PLAINTEXT)
.saslMechanism("SCRAM-SHA-256")
.maxRetries(1000)
.build()
val partitions = producer.partitionsFor(topic).asScala
assertEquals(0, partitions.count(p => p.leader != null && p.leader.id == servers.head.config.brokerId))
assertTrue("Did not find partitions with no leader", partitions.exists(_.leader == null))
}
private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val config = servers.head.config
val existingListenerCount = config.listeners.size
@ -860,14 +873,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @@ -860,14 +873,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}), "Listener not created")
if (saslMechanisms.nonEmpty)
saslMechanisms.foreach { mechanism =>
verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
}
else
verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol")
val brokerConfigs = describeConfig(adminClients.head).entries.asScala
val brokerConfigs = describeConfig(adminClients.head, servers).entries.asScala
props.asScala.foreach { case (name, value) =>
val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name"))
if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig)
@ -877,6 +883,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @@ -877,6 +883,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}
private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
addListener(servers, listenerName, securityProtocol, saslMechanisms)
if (saslMechanisms.nonEmpty)
saslMechanisms.foreach { mechanism =>
verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
}
else
verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol")
}
private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
@ -1006,7 +1023,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @@ -1006,7 +1023,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}
private def describeConfig(adminClient: AdminClient): Config = {
private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = {
val configResources = servers.map { server =>
new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
}

50
core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala

@ -110,7 +110,47 @@ class MetadataCacheTest { @@ -110,7 +110,47 @@ class MetadataCacheTest {
}
@Test
def getTopicMetadataPartitionLeaderNotAvailable() {
def getTopicMetadataPartitionLeaderNotAvailable(): Unit = {
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null))
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true)
}
@Test
def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = {
val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
val broker0Endpoints = Seq(
new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName),
new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName))
val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName))
val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null))
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
}
@Test
def getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit = {
val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
val broker0Endpoints = Seq(
new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName),
new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName))
val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName))
val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null))
verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false)
}
private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Set[Broker],
listenerName: ListenerName,
leader: Int,
expectedError: Errors,
errorUnavailableListeners: Boolean): Unit = {
val topic = "topic"
val cache = new MetadataCache(1)
@ -118,11 +158,7 @@ class MetadataCacheTest { @@ -118,11 +158,7 @@ class MetadataCacheTest {
val zkVersion = 3
val controllerId = 2
val controllerEpoch = 1
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null))
val leader = 1
val leaderEpoch = 1
val partitionStates = Map(
new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0), asList()))
@ -132,7 +168,7 @@ class MetadataCacheTest { @@ -132,7 +168,7 @@ class MetadataCacheTest {
partitionStates.asJava, brokers.asJava).build()
cache.updateCache(15, updateMetadataRequest)
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableListeners = errorUnavailableListeners)
assertEquals(1, topicMetadatas.size)
val topicMetadata = topicMetadatas.head
@ -143,7 +179,7 @@ class MetadataCacheTest { @@ -143,7 +179,7 @@ class MetadataCacheTest {
val partitionMetadata = partitionMetadatas.get(0)
assertEquals(0, partitionMetadata.partition)
assertEquals(Errors.LEADER_NOT_AVAILABLE, partitionMetadata.error)
assertEquals(expectedError, partitionMetadata.error)
assertTrue(partitionMetadata.isr.isEmpty)
assertEquals(1, partitionMetadata.replicas.size)
assertEquals(0, partitionMetadata.replicas.get(0).id)

Loading…
Cancel
Save