diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 619f7bdbdac..ea16ac9587f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Sensor; @@ -52,6 +53,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.stream.Collectors; /** * A network client for asynchronous request/response network i/o. This is an internal class used to implement the @@ -945,6 +947,20 @@ public class NetworkClient implements KafkaClient { public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) { this.metadataFetchInProgress = false; Cluster cluster = response.cluster(); + + // If any partition has leader with missing listeners, log a few for diagnosing broker configuration + // issues. This could be a transient issue if listeners were added dynamically to brokers. + List missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata -> + topicMetadata.partitionMetadata().stream() + .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND) + .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition()))) + .collect(Collectors.toList()); + if (!missingListenerPartitions.isEmpty()) { + int count = missingListenerPartitions.size(); + log.warn("{} partitions have leader brokers without a matching listener, including {}", + count, missingListenerPartitions.subList(0, Math.min(10, count))); + } + // check if any topics metadata failed to get updated Map errors = response.errors(); if (!errors.isEmpty()) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java new file mode 100644 index 00000000000..82c5d892f49 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/ListenerNotFoundException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * The leader does not have an endpoint corresponding to the listener on which metadata was requested. + * This could indicate a broker configuration error or a transient error when listeners are updated + * dynamically and client requests are processed before all brokers have updated their listeners. + * This is currently used only for missing listeners on leader brokers, but may be used for followers + * in future. + */ +public class ListenerNotFoundException extends InvalidMetadataException { + + private static final long serialVersionUID = 1L; + + public ListenerNotFoundException(String message) { + super(message); + } + + public ListenerNotFoundException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 5db1d314be3..9c522dff935 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.DelegationTokenDisabledException; import org.apache.kafka.common.errors.DelegationTokenExpiredException; import org.apache.kafka.common.errors.DelegationTokenNotFoundException; import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException; +import org.apache.kafka.common.errors.ListenerNotFoundException; import org.apache.kafka.common.errors.FetchSessionIdNotFoundException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; @@ -624,7 +625,14 @@ public enum Errors { public ApiException build(String message) { return new InvalidFetchSessionEpochException(message); } - }); + }), + LISTENER_NOT_FOUND(72, "There is no listener on the leader broker that matches the listener on which metadata request was processed", + new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new ListenerNotFoundException(message); + } + }),; private interface ApiExceptionBuilder { ApiException build(String message); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ebdf141f413..0c88be9bb5c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -991,8 +991,10 @@ class KafkaApis(val requestChannel: RequestChannel, } private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName, - errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { - val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints) + errorUnavailableEndpoints: Boolean, + errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata] = { + val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, + errorUnavailableEndpoints, errorUnavailableListeners) if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses } else { @@ -1068,12 +1070,15 @@ class KafkaApis(val requestChannel: RequestChannel, // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list val errorUnavailableEndpoints = requestVersion == 0 + // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader. + // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors. + val errorUnavailableListeners = requestVersion >= 6 val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[MetadataResponse.TopicMetadata] else getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName, - errorUnavailableEndpoints) + errorUnavailableEndpoints, errorUnavailableListeners) val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 43fe35287d3..b0603b87686 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -65,19 +65,29 @@ class MetadataCache(brokerId: Int) extends Logging { } // errorUnavailableEndpoints exists to support v0 MetadataResponses - private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { + // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. + // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). + private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean, + errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { cache.get(topic).map { partitions => partitions.map { case (partitionId, partitionState) => val topicPartition = TopicAndPartition(topic, partitionId) - val maybeLeader = getAliveEndpoint(partitionState.basePartitionState.leader, listenerName) + val leaderBrokerId = partitionState.basePartitionState.leader + val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName) val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt) val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints) val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints) maybeLeader match { case None => - debug(s"Error while fetching metadata for $topicPartition: leader not available") - new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, partitionId, Node.noNode(), + val error = if (!aliveBrokers.contains(brokerId)) { // we are already holding the read lock + debug(s"Error while fetching metadata for $topicPartition: leader not available") + Errors.LEADER_NOT_AVAILABLE + } else { + debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId") + if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE + } + new MetadataResponse.PartitionMetadata(error, partitionId, Node.noNode(), replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava) case Some(leader) => @@ -112,10 +122,11 @@ class MetadataCache(brokerId: Int) extends Logging { } // errorUnavailableEndpoints exists to support v0 MetadataResponses - def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = { + def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false, + errorUnavailableListeners: Boolean = false): Seq[MetadataResponse.TopicMetadata] = { inReadLock(partitionMetadataLock) { topics.toSeq.flatMap { topic => - getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata => + getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava) } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index cb57bbf4b13..96cca23c23e 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -817,10 +817,23 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI")) //verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN")) verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI")) + + // Verify that a listener added to a subset of servers doesn't cause any issues + // when metadata is processed by the client. + addListener(servers.tail, "SCRAM_LISTENER", SecurityProtocol.SASL_PLAINTEXT, Seq("SCRAM-SHA-256")) + val bootstrap = TestUtils.bootstrapServers(servers.tail, new ListenerName("SCRAM_LISTENER")) + val producer = ProducerBuilder().bootstrapServers(bootstrap) + .securityProtocol(SecurityProtocol.SASL_PLAINTEXT) + .saslMechanism("SCRAM-SHA-256") + .maxRetries(1000) + .build() + val partitions = producer.partitionsFor(topic).asScala + assertEquals(0, partitions.count(p => p.leader != null && p.leader.id == servers.head.config.brokerId)) + assertTrue("Did not find partitions with no leader", partitions.exists(_.leader == null)) } - private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol, - saslMechanisms: Seq[String]): Unit = { + private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol, + saslMechanisms: Seq[String]): Unit = { val config = servers.head.config val existingListenerCount = config.listeners.size val listeners = config.listeners @@ -860,14 +873,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } }), "Listener not created") - if (saslMechanisms.nonEmpty) - saslMechanisms.foreach { mechanism => - verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism") - } - else - verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol") - - val brokerConfigs = describeConfig(adminClients.head).entries.asScala + val brokerConfigs = describeConfig(adminClients.head, servers).entries.asScala props.asScala.foreach { case (name, value) => val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name")) if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig) @@ -877,6 +883,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } + private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol, + saslMechanisms: Seq[String]): Unit = { + addListener(servers, listenerName, securityProtocol, saslMechanisms) + if (saslMechanisms.nonEmpty) + saslMechanisms.foreach { mechanism => + verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism") + } + else + verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol") + } + private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol, saslMechanisms: Seq[String]): Unit = { val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head @@ -1006,7 +1023,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } - private def describeConfig(adminClient: AdminClient): Config = { + private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = { val configResources = servers.map { server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 0ee73659780..82c14ee5827 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -110,7 +110,47 @@ class MetadataCacheTest { } @Test - def getTopicMetadataPartitionLeaderNotAvailable() { + def getTopicMetadataPartitionLeaderNotAvailable(): Unit = { + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null)) + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName, + leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false) + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, listenerName, + leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true) + } + + @Test + def getTopicMetadataPartitionListenerNotAvailableOnLeader(): Unit = { + val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL) + val broker0Endpoints = Seq( + new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName), + new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName)) + val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName)) + val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null)) + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName, + leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true) + } + + @Test + def getTopicMetadataPartitionListenerNotAvailableOnLeaderOldMetadataVersion(): Unit = { + val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL) + val broker0Endpoints = Seq( + new EndPoint("host0", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName), + new EndPoint("host0", 9093, SecurityProtocol.SSL, sslListenerName)) + val broker1Endpoints = Seq(new EndPoint("host1", 9092, SecurityProtocol.PLAINTEXT, plaintextListenerName)) + val brokers = Set(new Broker(0, broker0Endpoints.asJava, null), new Broker(1, broker1Endpoints.asJava, null)) + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, sslListenerName, + leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false) + } + + private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers: Set[Broker], + listenerName: ListenerName, + leader: Int, + expectedError: Errors, + errorUnavailableListeners: Boolean): Unit = { val topic = "topic" val cache = new MetadataCache(1) @@ -118,11 +158,7 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null)) - val leader = 1 val leaderEpoch = 1 val partitionStates = Map( new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0), asList())) @@ -132,7 +168,7 @@ class MetadataCacheTest { partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) - val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName) + val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableListeners = errorUnavailableListeners) assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head @@ -143,7 +179,7 @@ class MetadataCacheTest { val partitionMetadata = partitionMetadatas.get(0) assertEquals(0, partitionMetadata.partition) - assertEquals(Errors.LEADER_NOT_AVAILABLE, partitionMetadata.error) + assertEquals(expectedError, partitionMetadata.error) assertTrue(partitionMetadata.isr.isEmpty) assertEquals(1, partitionMetadata.replicas.size) assertEquals(0, partitionMetadata.replicas.get(0).id)