From cd3c0ab1a3de0e02f473edeeb986acff3fc87230 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Wed, 7 Jun 2023 13:11:55 -0700 Subject: [PATCH] KAFKA-15060: fix the ApiVersionManager interface This PR expands the scope of ApiVersionManager a bit to include returning the current MetadataVersion and features that are in effect. This is useful in general because that information needs to be returned in an ApiVersionsResponse. It also allows us to fix the ApiVersionManager interface so that all subclasses implement all methods of the interface. Having subclasses that don't implement some methods is dangerous because they could cause exceptions at runtime in unexpected scenarios. On the KRaft controller, we were previously performing a read operation in the QuorumController thread to get the current metadata version and features. With this PR, we now read a volatile variable maintained by a separate MetadataVersionContextPublisher object. This will improve performance and simplify the code. It should not change the guarantees we are providing; in both the old and new scenarios, we need to be robust against version skew scenarios during updates. Add a Features class which just has a 3-tuple of metadata version, features, and feature epoch. Remove MetadataCache.FinalizedFeaturesAndEpoch, since it just duplicates the Features class. (There are some additional feature-related classes that can be consolidated in in a follow-on PR.) Create a java class, EndpointReadyFutures, for managing the futures associated with individual authorizer endpoints. This avoids code duplication between ControllerServer and BrokerServer and makes this code unit-testable. Reviewers: David Arthur , dengziming , Luke Chen --- checkstyle/import-control-server-common.xml | 4 + .../kafka/controller/KafkaController.scala | 8 +- .../kafka/server/ApiVersionManager.scala | 55 ++--- .../scala/kafka/server/BrokerServer.scala | 28 +-- .../scala/kafka/server/ControllerApis.scala | 15 +- .../scala/kafka/server/ControllerServer.scala | 31 ++- .../scala/kafka/server/MetadataCache.scala | 13 +- .../server/metadata/KRaftMetadataCache.scala | 18 +- .../server/metadata/ZkMetadataCache.scala | 47 ++-- .../kafka/tools/TestRaftRequestHandler.scala | 2 +- .../scala/kafka/tools/TestRaftServer.scala | 7 +- .../unit/kafka/network/SocketServerTest.scala | 6 +- .../kafka/server/ApiVersionManagerTest.scala | 12 +- .../kafka/server/ControllerApisTest.scala | 10 +- .../server/FinalizedFeatureCacheTest.scala | 20 +- .../FinalizedFeatureChangeListenerTest.scala | 21 +- .../unit/kafka/server/KafkaApisTest.scala | 10 +- .../KRaftMetadataRequestBenchmark.java | 8 +- .../metadata/MetadataRequestBenchmark.java | 7 +- .../metadata/publisher/FeaturesPublisher.java | 54 +++++ .../apache/kafka/server/common/Features.java | 87 +++++++ .../server/network/EndpointReadyFutures.java | 219 ++++++++++++++++++ .../network/KafkaAuthorizerServerInfo.java | 108 +++++++++ .../kafka/server/common/FeaturesTest.java | 50 ++++ .../network/EndpointReadyFuturesTest.java | 169 ++++++++++++++ 25 files changed, 857 insertions(+), 152 deletions(-) create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java create mode 100644 server-common/src/main/java/org/apache/kafka/server/common/Features.java create mode 100644 server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java create mode 100644 server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java create mode 100644 server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java create mode 100644 server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml index d310d81a832..f238bc9b8d9 100644 --- a/checkstyle/import-control-server-common.xml +++ b/checkstyle/import-control-server-common.xml @@ -77,6 +77,10 @@ + + + + diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index ee98123059a..c3f76e83d12 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1598,7 +1598,9 @@ class KafkaController(val config: KafkaConfig, !config.isFeatureVersioningSupported || !featureCache.getFeatureOption.exists( latestFinalizedFeatures => - BrokerFeatures.hasIncompatibleFeatures(broker.features, latestFinalizedFeatures.features)) + BrokerFeatures.hasIncompatibleFeatures(broker.features, + latestFinalizedFeatures.finalizedFeatures().asScala. + map(kv => (kv._1, kv._2.toShort)).toMap)) } } @@ -2081,8 +2083,8 @@ class KafkaController(val config: KafkaConfig, callback: UpdateFeaturesCallback): Unit = { val updates = request.featureUpdates val existingFeatures = featureCache.getFeatureOption - .map(featuresAndEpoch => featuresAndEpoch.features) - .getOrElse(Map[String, Short]()) + .map(featuresAndEpoch => featuresAndEpoch.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort)).toMap) + .getOrElse(Map[String, Short]()) // A map with key being feature name and value being finalized version. // This contains the target features to be eventually written to FeatureZNode. val targetFeatures = scala.collection.mutable.Map[String, Short]() ++ existingFeatures diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index c3346a5edb2..fa796bc6688 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -18,10 +18,11 @@ package kafka.server import kafka.network import kafka.network.RequestChannel -import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.feature.SupportedVersionRange import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.ApiVersionsResponse +import org.apache.kafka.server.common.Features import scala.jdk.CollectionConverters._ @@ -30,22 +31,14 @@ trait ApiVersionManager { def listenerType: ListenerType def enabledApis: collection.Set[ApiKeys] - /** - * @see [[DefaultApiVersionManager.apiVersionResponse]] - * @see [[kafka.server.KafkaApis.handleApiVersionsRequest]] - */ def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse - /** - * @see [[SimpleApiVersionManager.apiVersionResponse]] - * @see [[kafka.server.ControllerApis.handleApiVersionsRequest]] - */ - def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): ApiVersionsResponse - def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = { apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion) } def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis) + + def features: Features } object ApiVersionManager { @@ -80,41 +73,43 @@ object ApiVersionManager { class SimpleApiVersionManager( val listenerType: ListenerType, val enabledApis: collection.Set[ApiKeys], - brokerFeatures: Features[SupportedVersionRange], + brokerFeatures: org.apache.kafka.common.feature.Features[SupportedVersionRange], val enableUnstableLastVersion: Boolean, - val zkMigrationEnabled: Boolean + val zkMigrationEnabled: Boolean, + val featuresProvider: () => Features ) extends ApiVersionManager { def this( listenerType: ListenerType, enableUnstableLastVersion: Boolean, - zkMigrationEnabled: Boolean + zkMigrationEnabled: Boolean, + featuresProvider: () => Features ) = { this( listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures(), enableUnstableLastVersion, - zkMigrationEnabled + zkMigrationEnabled, + featuresProvider ) } private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion) - override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = { - throw new UnsupportedOperationException("This method is not supported in SimpleApiVersionManager, use apiVersionResponse(throttleTimeMs, finalizedFeatures, epoch) instead") - } - - override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): ApiVersionsResponse = { + override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = { + val currentFeatures = features ApiVersionsResponse.createApiVersionsResponse( throttleTimeMs, apiVersions, brokerFeatures, - finalizedFeatures.asJava, - finalizedFeaturesEpoch, + currentFeatures.finalizedFeatures(), + currentFeatures.finalizedFeaturesEpoch(), zkMigrationEnabled ) } + + override def features: Features = featuresProvider.apply() } /** @@ -124,7 +119,7 @@ class SimpleApiVersionManager( * * @param listenerType the listener type * @param forwardingManager the forwarding manager, - * @param features the broker features + * @param brokerFeatures the broker features * @param metadataCache the metadata cache, used to get the finalized features and the metadata version * @param enableUnstableLastVersion whether to enable unstable last version, see [[KafkaConfig.unstableApiVersionsEnabled]] * @param zkMigrationEnabled whether to enable zk migration, see [[KafkaConfig.migrationEnabled]] @@ -132,7 +127,7 @@ class SimpleApiVersionManager( class DefaultApiVersionManager( val listenerType: ListenerType, forwardingManager: Option[ForwardingManager], - features: BrokerFeatures, + brokerFeatures: BrokerFeatures, metadataCache: MetadataCache, val enableUnstableLastVersion: Boolean, val zkMigrationEnabled: Boolean = false @@ -141,16 +136,16 @@ class DefaultApiVersionManager( val enabledApis = ApiKeys.apisForListener(listenerType).asScala override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = { - val supportedFeatures = features.supportedFeatures + val supportedFeatures = brokerFeatures.supportedFeatures val finalizedFeatures = metadataCache.features() val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions) ApiVersionsResponse.createApiVersionsResponse( throttleTimeMs, - metadataCache.metadataVersion().highestSupportedRecordVersion, + finalizedFeatures.metadataVersion().highestSupportedRecordVersion, supportedFeatures, - finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, - finalizedFeatures.epoch, + finalizedFeatures.finalizedFeatures(), + finalizedFeatures.finalizedFeaturesEpoch(), controllerApiVersions.orNull, listenerType, enableUnstableLastVersion, @@ -158,7 +153,5 @@ class DefaultApiVersionManager( ) } - override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse = { - throw new UnsupportedOperationException("This method is not supported in DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead") - } + override def features: Features = metadataCache.features() } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 22e14e96b0a..2bf29c32d97 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.cluster.Broker.ServerInfo import kafka.cluster.EndPoint import kafka.coordinator.group.GroupCoordinatorAdapter import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} @@ -48,6 +47,7 @@ import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler} import org.apache.kafka.storage.internals.log.LogDirFailureChannel @@ -366,25 +366,10 @@ class BrokerServer( config.interBrokerListenerName.value() + ". Found listener(s): " + endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", ")) } - val authorizerInfo = ServerInfo(new ClusterResource(clusterId), - config.nodeId, - endpoints, - interBrokerListener, - config.earlyStartListeners.map(_.value()).asJava) // Create and initialize an authorizer if one is configured. authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { - case Some(authZ) => - authZ.start(authorizerInfo).asScala.map { case (ep, cs) => - ep -> cs.toCompletableFuture - } - case None => - authorizerInfo.endpoints.asScala.map { ep => - ep -> CompletableFuture.completedFuture[Void](null) - }.toMap - } val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, @@ -504,6 +489,17 @@ class BrokerServer( // Enable inbound TCP connections. Each endpoint will be started only once its matching // authorizer future is completed. + val endpointReadyFutures = { + val builder = new EndpointReadyFutures.Builder() + builder.build(authorizer.asJava, + new KafkaAuthorizerServerInfo( + new ClusterResource(clusterId), + config.nodeId, + endpoints, + interBrokerListener, + config.earlyStartListeners.map(_.value()).asJava)) + } + val authorizerFutures = endpointReadyFutures.futures().asScala.toMap val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures) // Block here until all the authorizer futures are complete. diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index ba3da5c47c7..1be517e25d7 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel, if (apiVersionRequest.hasUnsupportedRequestVersion) { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) } else if (!apiVersionRequest.isValid) { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception)) - CompletableFuture.completedFuture[Unit](()) } else { - val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty()) - controller.finalizedFeatures(context).handle { (result, exception) => - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - if (exception != null) { - apiVersionRequest.getErrorResponse(requestThrottleMs, exception) - } else { - apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch()) - } - }) - } + requestHelper.sendResponseMaybeThrottle(request, + requestThrottleMs => apiVersionManager.apiVersionResponse(requestThrottleMs)) } + CompletableFuture.completedFuture[Unit](()) } def authorizeAlterResource(requestContext: RequestContext, diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index b45445e1b0d..36d8f5eca1e 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.cluster.Broker.ServerInfo import kafka.metrics.LinuxIoMetricsCollector import kafka.migration.MigrationPropagator import kafka.network.{DataPlaneAcceptor, SocketServer} @@ -43,10 +42,12 @@ import org.apache.kafka.metadata.KafkaConfigSchema import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator} +import org.apache.kafka.metadata.publisher.FeaturesPublisher import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} +import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} import org.apache.kafka.server.util.{Deadline, FutureUtils} @@ -115,6 +116,7 @@ class ControllerServer( var migrationSupport: Option[ControllerMigrationSupport] = None def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE val metadataPublishers: util.List[MetadataPublisher] = new util.ArrayList[MetadataPublisher]() + val featuresPublisher = new FeaturesPublisher() private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { lock.lock() @@ -153,30 +155,22 @@ class ControllerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { - case Some(authZ) => - // It would be nice to remove some of the broker-specific assumptions from - // AuthorizerServerInfo, such as the assumption that there is an inter-broker - // listener, or that ID is named brokerId. - val controllerAuthorizerInfo = ServerInfo( + val endpointReadyFutures = { + val builder = new EndpointReadyFutures.Builder() + builder.build(authorizer.asJava, + new KafkaAuthorizerServerInfo( new ClusterResource(clusterId), config.nodeId, javaListeners, javaListeners.get(0), - config.earlyStartListeners.map(_.value()).asJava) - authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) => - ep -> cs.toCompletableFuture - }.toMap - case None => - javaListeners.asScala.map { - ep => ep -> CompletableFuture.completedFuture[Void](null) - }.toMap + config.earlyStartListeners.map(_.value()).asJava)) } val apiVersionManager = new SimpleApiVersionManager( ListenerType.CONTROLLER, config.unstableApiVersionsEnabled, - config.migrationEnabled + config.migrationEnabled, + () => featuresPublisher.features() ) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) @@ -302,6 +296,8 @@ class ControllerServer( s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix) + val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = endpointReadyFutures.futures().asScala.toMap + /** * Enable the controller endpoint(s). If we are using an authorizer which stores * ACLs in the metadata log, such as StandardAuthorizer, we will be able to start @@ -326,6 +322,9 @@ class ControllerServer( // register this instance for dynamic config changes to the KafkaConfig config.dynamicConfig.addReconfigurables(this) + // Set up the metadata features publisher. + metadataPublishers.add(featuresPublisher) + // Set up the dynamic config publisher. This runs even in combined mode, since the broker // has its own separate dynamic configuration object. metadataPublishers.add(new DynamicConfigPublisher( diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 342b23cec4b..ecd64c17b91 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -22,16 +22,10 @@ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{Features, MetadataVersion} import java.util -case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long) { - override def toString(): String = { - s"FinalizedFeaturesAndEpoch(features=$features, epoch=$epoch)" - } -} - /** * Used to represent the controller id cached in the metadata cache of the broker. This trait is * extended to represent if the controller is KRaft controller or Zk controller. @@ -44,7 +38,6 @@ case class ZkCachedControllerId(id: Int) extends CachedControllerId case class KRaftCachedControllerId(id: Int) extends CachedControllerId trait MetadataCache { - /** * Return topic metadata for a given set of topics and listener. See KafkaApis#handleTopicMetadataRequest for details * on the use of the two boolean flags. @@ -113,9 +106,9 @@ trait MetadataCache { def metadataVersion(): MetadataVersion - def features(): FinalizedFeaturesAndEpoch - def getRandomAliveBrokerId: Option[Int] + + def features(): Features } object MetadataCache { diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 300bb3e5cab..5e3ed11022b 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -18,7 +18,7 @@ package kafka.server.metadata import kafka.controller.StateChangeLogger -import kafka.server.{CachedControllerId, FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache} +import kafka.server.{CachedControllerId, KRaftCachedControllerId, MetadataCache} import kafka.utils.Logging import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} @@ -37,7 +37,7 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData} import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData} import org.apache.kafka.metadata.{PartitionRegistration, Replicas} -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{Features, MetadataVersion} import scala.collection.{Seq, Set, mutable} import scala.jdk.CollectionConverters._ @@ -393,15 +393,11 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w override def metadataVersion(): MetadataVersion = _currentImage.features().metadataVersion() - override def features(): FinalizedFeaturesAndEpoch = { + override def features(): Features = { val image = _currentImage - val features = image.features().finalizedVersions().asScala.map { - case (name: String, level: java.lang.Short) => name -> Short2short(level) - } - features.put(MetadataVersion.FEATURE_NAME, image.features().metadataVersion().featureLevel()) - - FinalizedFeaturesAndEpoch( - features.toMap, - image.highestOffsetAndEpoch().offset) + new Features(image.features().metadataVersion(), + image.features().finalizedVersions(), + image.highestOffsetAndEpoch().offset, + true) } } diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index feaaf1c43f1..9159b791880 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._ import kafka.cluster.{Broker, EndPoint} import kafka.api._ import kafka.controller.StateChangeLogger -import kafka.server.{BrokerFeatures, CachedControllerId, FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId} +import kafka.server.{BrokerFeatures, CachedControllerId, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId} import kafka.utils.CoreUtils._ import kafka.utils.Logging import kafka.utils.Implicits._ @@ -40,7 +40,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{Features, MetadataVersion} import java.util.concurrent.{ThreadLocalRandom, TimeUnit} import scala.concurrent.TimeoutException @@ -53,7 +53,7 @@ class FeatureCacheUpdateException(message: String) extends RuntimeException(mess trait ZkFinalizedFeatureCache { def waitUntilFeatureEpochOrThrow(minExpectedEpoch: Long, timeoutMs: Long): Unit - def getFeatureOption: Option[FinalizedFeaturesAndEpoch] + def getFeatureOption: Option[Features] } /** @@ -83,7 +83,7 @@ class ZkMetadataCache( private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) // Features are updated via ZK notification (see FinalizedFeatureChangeListener) - @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + @volatile private var _features: Option[Features] = Option.empty private val featureLock = new ReentrantLock() private val featureCond = featureLock.newCondition() @@ -488,11 +488,12 @@ class ZkMetadataCache( override def metadataVersion(): MetadataVersion = metadataVersion - override def features(): FinalizedFeaturesAndEpoch = { - featuresAndEpoch match { - case Some(features) => features - case None => FinalizedFeaturesAndEpoch(Map.empty, ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH) - } + override def features(): Features = _features match { + case Some(features) => features + case None => new Features(metadataVersion, + Collections.emptyMap(), + ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, + false) } /** @@ -509,14 +510,18 @@ class ZkMetadataCache( * not modified. */ def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = { - val latest = FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch) - val existing = featuresAndEpoch.map(item => item.toString()).getOrElse("") - if (featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch > latest.epoch) { + val latest = new Features(metadataVersion, + latestFeatures.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, + latestEpoch, + false) + val existing = _features + if (existing.isDefined && existing.get.finalizedFeaturesEpoch() > latest.finalizedFeaturesEpoch()) { val errorMsg = s"FinalizedFeatureCache update failed due to invalid epoch in new $latest." + s" The existing cache contents are $existing." throw new FeatureCacheUpdateException(errorMsg) } else { - val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features) + val incompatibleFeatures = brokerFeatures.incompatibleFeatures( + latest.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort)).toMap) if (incompatibleFeatures.nonEmpty) { val errorMsg = "FinalizedFeatureCache update failed since feature compatibility" + s" checks failed! Supported ${brokerFeatures.supportedFeatures} has incompatibilities" + @@ -525,7 +530,7 @@ class ZkMetadataCache( } else { val logMsg = s"Updated cache from existing $existing to latest $latest." inLock(featureLock) { - featuresAndEpoch = Some(latest) + _features = Some(latest) featureCond.signalAll() } info(logMsg) @@ -533,13 +538,12 @@ class ZkMetadataCache( } } - /** * Clears all existing finalized features and epoch from the cache. */ def clearFeatures(): Unit = { inLock(featureLock) { - featuresAndEpoch = None + _features = None featureCond.signalAll() } } @@ -565,12 +569,12 @@ class ZkMetadataCache( } val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1000000) inLock(featureLock) { - while (!(featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch >= minExpectedEpoch)) { + while (!(_features.isDefined && _features.get.finalizedFeaturesEpoch() >= minExpectedEpoch)) { val nowNanos = System.nanoTime() if (nowNanos > waitEndTimeNanos) { throw new TimeoutException( s"Timed out after waiting for ${timeoutMs}ms for required condition to be met." + - s" Current epoch: ${featuresAndEpoch.map(fe => fe.epoch).getOrElse("")}.") + s" Current epoch: ${_features.map(fe => fe.finalizedFeaturesEpoch()).getOrElse("")}.") } val sleepTimeMs = max(1L, (waitEndTimeNanos - nowNanos) / 1000000) featureCond.await(sleepTimeMs, TimeUnit.MILLISECONDS) @@ -578,10 +582,5 @@ class ZkMetadataCache( } } - /** - * @return the latest known FinalizedFeaturesAndEpoch or empty if not defined in the cache. - */ - def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = { - featuresAndEpoch - } + override def getFeatureOption: Option[Features] = _features } diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala index b379efeb858..a9b471b1622 100644 --- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala +++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala @@ -64,7 +64,7 @@ class TestRaftRequestHandler( } private def handleApiVersions(request: RequestChannel.Request): Unit = { - requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0, Map.empty, -1), None) + requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0), None) } private def handleVote(request: RequestChannel.Request): Unit = { diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index be45bae578d..1026c528473 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{TopicPartition, Uuid, protocol} import org.apache.kafka.raft.errors.NotLeaderException import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig} +import org.apache.kafka.server.common.{Features, MetadataVersion} import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils, ShutdownableThread} @@ -74,7 +75,11 @@ class TestRaftServer( tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) - val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false) + val apiVersionManager = new SimpleApiVersionManager( + ListenerType.CONTROLLER, + true, + false, + () => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION)) socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) val metaProperties = MetaProperties( diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5e9e84fd465..58ba9d3af7f 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -24,7 +24,7 @@ import java.nio.channels.{SelectionKey, SocketChannel} import java.nio.charset.StandardCharsets import java.util import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, ExecutionException, Executors, TimeUnit} -import java.util.{Properties, Random} +import java.util.{Collections, Properties, Random} import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, TextNode} import com.yammer.metrics.core.{Gauge, Meter} @@ -46,6 +46,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.utils.{AppInfoParser, LogContext, MockTime, Time, Utils} +import org.apache.kafka.server.common.{Features, MetadataVersion} import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils} import org.apache.log4j.Level import org.junit.jupiter.api.Assertions._ @@ -77,7 +78,8 @@ class SocketServerTest { // Clean-up any metrics left around by previous tests TestUtils.clearYammerMetrics() - private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false) + private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, false, + () => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, true)) val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES) val sockets = new ArrayBuffer[Socket] diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala index bcc84443f16..6c9c3a70ca6 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala @@ -39,7 +39,7 @@ class ApiVersionManagerTest { val versionManager = new DefaultApiVersionManager( listenerType = apiScope, forwardingManager = None, - features = brokerFeatures, + brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = true ) @@ -57,7 +57,7 @@ class ApiVersionManagerTest { val versionManager = new DefaultApiVersionManager( listenerType = apiScope, forwardingManager = None, - features = brokerFeatures, + brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = false ) @@ -86,7 +86,7 @@ class ApiVersionManagerTest { val versionManager = new DefaultApiVersionManager( listenerType = ListenerType.ZK_BROKER, forwardingManager = Some(forwardingManager), - features = brokerFeatures, + brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = true ) @@ -107,7 +107,7 @@ class ApiVersionManagerTest { val versionManager = new DefaultApiVersionManager( listenerType = ListenerType.BROKER, forwardingManager = forwardingManagerOpt, - features = brokerFeatures, + brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = true ) @@ -129,7 +129,7 @@ class ApiVersionManagerTest { val versionManager = new DefaultApiVersionManager( listenerType = ListenerType.ZK_BROKER, forwardingManager = Some(forwardingManager), - features = brokerFeatures, + brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = true ) @@ -148,7 +148,7 @@ class ApiVersionManagerTest { val versionManager = new DefaultApiVersionManager( listenerType = ListenerType.ZK_BROKER, forwardingManager = None, - features = brokerFeatures, + brokerFeatures = brokerFeatures, metadataCache = metadataCache, enableUnstableLastVersion = true ) diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 5243a21f1be..bd2a306719b 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -46,12 +46,12 @@ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.{ElectionType, Uuid} import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT import org.apache.kafka.controller.{Controller, ControllerRequestContext, ResultOrError} import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} -import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock} -import org.apache.kafka.server.util.MockTime +import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion, ProducerIdsBlock} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -155,7 +155,11 @@ class ControllerApisTest { new KafkaConfig(props), MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId), Seq.empty, - new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false) + new SimpleApiVersionManager( + ListenerType.CONTROLLER, + true, + false, + () => Features.fromKRaftVersion(MetadataVersion.latest())) ) } diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala index 5eb562fb29f..b94bfb16c7e 100644 --- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala @@ -32,6 +32,10 @@ class FinalizedFeatureCacheTest { assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, BrokerFeatures.createDefault()).getFeatureOption.isEmpty) } + def asJava(input: Map[String, Short]): java.util.Map[String, java.lang.Short] = { + input.map(kv => kv._1 -> kv._2.asInstanceOf[java.lang.Short]).asJava + } + @Test def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = { val supportedFeatures = Map[String, SupportedVersionRange]( @@ -44,15 +48,15 @@ class FinalizedFeatureCacheTest { val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, brokerFeatures) cache.updateFeaturesOrThrow(finalizedFeatures, 10) assertTrue(cache.getFeatureOption.isDefined) - assertEquals(finalizedFeatures, cache.getFeatureOption.get.features) - assertEquals(10, cache.getFeatureOption.get.epoch) + assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures()) + assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch()) assertThrows(classOf[FeatureCacheUpdateException], () => cache.updateFeaturesOrThrow(finalizedFeatures, 9)) // Check that the failed updateOrThrow call did not make any mutations. assertTrue(cache.getFeatureOption.isDefined) - assertEquals(finalizedFeatures, cache.getFeatureOption.get.features) - assertEquals(10, cache.getFeatureOption.get.epoch) + assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures()) + assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch()) } @Test @@ -83,8 +87,8 @@ class FinalizedFeatureCacheTest { val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, brokerFeatures) cache.updateFeaturesOrThrow(finalizedFeatures, 12) assertTrue(cache.getFeatureOption.isDefined) - assertEquals(finalizedFeatures, cache.getFeatureOption.get.features) - assertEquals(12, cache.getFeatureOption.get.epoch) + assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures()) + assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch()) } @Test @@ -99,8 +103,8 @@ class FinalizedFeatureCacheTest { val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, brokerFeatures) cache.updateFeaturesOrThrow(finalizedFeatures, 12) assertTrue(cache.getFeatureOption.isDefined) - assertEquals(finalizedFeatures, cache.getFeatureOption.get.features) - assertEquals(12, cache.getFeatureOption.get.epoch) + assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures()) + assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch()) cache.clearFeatures() assertTrue(cache.getFeatureOption.isEmpty) diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala index 67313ba3c26..844cd528c43 100644 --- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala @@ -21,6 +21,7 @@ import kafka.server.metadata.ZkMetadataCache import kafka.utils.TestUtils import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion} import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.server.common.{Features => JFeatures} import org.apache.kafka.common.utils.Exit import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0 @@ -32,6 +33,15 @@ import java.util.concurrent.{CountDownLatch, TimeoutException} import scala.jdk.CollectionConverters._ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness { + case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long) { + override def toString(): String = { + s"FinalizedFeaturesAndEpoch(features=$features, epoch=$epoch)" + } + } + + def asJava(input: Map[String, Short]): java.util.Map[String, java.lang.Short] = { + input.map(kv => kv._1 -> kv._2.asInstanceOf[java.lang.Short]).asJava + } private def createBrokerFeatures(): BrokerFeatures = { val supportedFeaturesMap = Map[String, SupportedVersionRange]( @@ -64,8 +74,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness { val mayBeNewCacheContent = cache.getFeatureOption assertFalse(mayBeNewCacheContent.isEmpty) val newCacheContent = mayBeNewCacheContent.get - assertEquals(expectedCacheContent.get.features, newCacheContent.features) - assertEquals(expectedCacheContent.get.epoch, newCacheContent.epoch) + assertEquals(asJava(expectedCacheContent.get.features), newCacheContent.finalizedFeatures()) + assertEquals(expectedCacheContent.get.epoch, newCacheContent.finalizedFeaturesEpoch()) } else { val mayBeNewCacheContent = cache.getFeatureOption assertTrue(mayBeNewCacheContent.isEmpty) @@ -94,7 +104,9 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness { assertTrue(updatedVersion > initialFinalizedFeatures.epoch) cache.waitUntilFeatureEpochOrThrow(updatedVersion, JTestUtils.DEFAULT_MAX_WAIT_MS) - assertEquals(FinalizedFeaturesAndEpoch(finalizedFeatures, updatedVersion), cache.getFeatureOption.get) + assertEquals(new JFeatures(MetadataVersion.IBP_2_8_IV1, + asJava(finalizedFeatures), updatedVersion, false), + cache.getFeatureOption.get) assertTrue(listener.isListenerInitiated) } @@ -248,7 +260,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness { listener.isListenerDead && // Make sure the cache contents are as expected, and, the incompatible features were not // applied. - cache.getFeatureOption.get.equals(initialFinalizedFeatures) + cache.getFeatureOption.get.equals(new JFeatures(MetadataVersion.IBP_2_8_IV1, + asJava(initialFinalizedFeatures.features), initialFinalizedFeatures.epoch, false)) }, "Timed out waiting for listener death and FinalizedFeatureCache to be updated") } finally { Exit.resetExitProcedure() diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 40023dd9859..fd95b74ad9f 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -93,7 +93,7 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection} import org.apache.kafka.coordinator.group.GroupCoordinator -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{Features, MetadataVersion} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData} @@ -185,7 +185,13 @@ class KafkaApisTest { } else { ApiKeys.apisForListener(listenerType).asScala.toSet } - val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false) + val apiVersionManager = new SimpleApiVersionManager( + listenerType, + enabledApis, + BrokerFeatures.defaultSupportedFeatures(), + true, + false, + () => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport)) new KafkaApis( requestChannel = requestChannel, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index 25e182b895c..13409a57044 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -59,6 +59,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinator; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.MetadataVersion; import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -197,7 +199,11 @@ public class KRaftMetadataRequestBenchmark { setClusterId("clusterId"). setTime(Time.SYSTEM). setTokenManager(null). - setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false)). + setApiVersionManager(new SimpleApiVersionManager( + ApiMessageType.ListenerType.BROKER, + false, + false, + () -> Features.fromKRaftVersion(MetadataVersion.latest()))). build(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 1685105e9d1..5f0bcec62f0 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -60,6 +60,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.GroupCoordinator; +import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.MetadataVersion; import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; @@ -199,7 +200,11 @@ public class MetadataRequestBenchmark { setClusterId("clusterId"). setTime(Time.SYSTEM). setTokenManager(null). - setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false, false)). + setApiVersionManager(new SimpleApiVersionManager( + ApiMessageType.ListenerType.ZK_BROKER, + false, + false, + () -> Features.fromKRaftVersion(MetadataVersion.latest()))). build(); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java new file mode 100644 index 00000000000..8be90ec87f5 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.publisher; + +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.server.common.Features; + +import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; + + +public class FeaturesPublisher implements MetadataPublisher { + private volatile Features features = Features.fromKRaftVersion(MINIMUM_KRAFT_VERSION); + + public Features features() { + return features; + } + + @Override + public String name() { + return "FeaturesPublisher"; + } + + @Override + public void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage, + LoaderManifest manifest + ) { + if (delta.featuresDelta() != null) { + features = new Features(newImage.features().metadataVersion(), + newImage.features().finalizedVersions(), + newImage.provenance().lastContainedOffset(), + true); + } + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Features.java b/server-common/src/main/java/org/apache/kafka/server/common/Features.java new file mode 100644 index 00000000000..ba411048063 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; + +public final class Features { + private final MetadataVersion version; + private final Map finalizedFeatures; + private final long finalizedFeaturesEpoch; + + public static Features fromKRaftVersion(MetadataVersion version) { + return new Features(version, Collections.emptyMap(), -1, true); + } + + public Features( + MetadataVersion version, + Map finalizedFeatures, + long finalizedFeaturesEpoch, + boolean kraftMode + ) { + this.version = version; + this.finalizedFeatures = new HashMap<>(finalizedFeatures); + this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; + // In KRaft mode, we always include the metadata version in the features map. + // In ZK mode, we never include it. + if (kraftMode) { + this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); + } else { + this.finalizedFeatures.remove(FEATURE_NAME); + } + } + + public MetadataVersion metadataVersion() { + return version; + } + + public Map finalizedFeatures() { + return finalizedFeatures; + } + + public long finalizedFeaturesEpoch() { + return finalizedFeaturesEpoch; + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o.getClass().equals(Features.class))) return false; + Features other = (Features) o; + return version == other.version && + finalizedFeatures.equals(other.finalizedFeatures) && + finalizedFeaturesEpoch == other.finalizedFeaturesEpoch; + } + + @Override + public int hashCode() { + return Objects.hash(version, finalizedFeatures, finalizedFeaturesEpoch); + } + + @Override + public String toString() { + return "Features" + + "(version=" + version + + ", finalizedFeatures=" + finalizedFeatures + + ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch + + ")"; + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java new file mode 100644 index 00000000000..1841cba9f19 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.network; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.authorizer.Authorizer; +import org.apache.kafka.server.authorizer.AuthorizerServerInfo; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * Manages a set of per-endpoint futures. + */ +public class EndpointReadyFutures { + public static class Builder { + private LogContext logContext = null; + private final Map> endpointStages = new HashMap<>(); + private final List stages = new ArrayList<>(); + + /** + * Add a readiness future that will block all endpoints. + * + * @param name The future name. + * @param future The future object. + * + * @return This builder object. + */ + public Builder addReadinessFuture( + String name, + CompletableFuture future + ) { + stages.add(new EndpointCompletionStage(name, future)); + return this; + } + + /** + * Add readiness futures for individual endpoints. + * + * @param name The future name. + * @param newFutures A map from endpoints to futures. + * + * @return This builder object. + */ + public Builder addReadinessFutures( + String name, + Map> newFutures + ) { + newFutures.forEach((endpoint, future) -> { + endpointStages.computeIfAbsent(endpoint, __ -> new ArrayList<>()). + add(new EndpointCompletionStage(name, future)); + }); + return this; + } + + /** + * Build the EndpointReadyFutures object. + * + * @param authorizer The authorizer to use, if any. Will be started. + * @param info Server information to be passed to the authorizer. + * + * @return The new futures object. + */ + public EndpointReadyFutures build( + Optional authorizer, + AuthorizerServerInfo info + ) { + if (authorizer.isPresent()) { + return build(authorizer.get().start(info), info); + } else { + return build(Collections.emptyMap(), info); + } + } + + EndpointReadyFutures build( + Map> authorizerStartFutures, + AuthorizerServerInfo info + ) { + if (logContext == null) logContext = new LogContext(); + Map> effectiveStartFutures = + new HashMap<>(authorizerStartFutures); + for (Endpoint endpoint : info.endpoints()) { + if (!effectiveStartFutures.containsKey(endpoint)) { + CompletableFuture completedFuture = CompletableFuture.completedFuture(null); + effectiveStartFutures.put(endpoint, completedFuture); + } + } + if (info.endpoints().size() != effectiveStartFutures.size()) { + List notInInfo = new ArrayList<>(); + for (Endpoint endpoint : effectiveStartFutures.keySet()) { + if (!info.endpoints().contains(endpoint)) { + notInInfo.add(endpoint.listenerName().orElse("[none]")); + } + } + throw new RuntimeException("Found authorizer futures that weren't included " + + "in AuthorizerServerInfo: " + notInInfo); + } + addReadinessFutures("authorizerStart", effectiveStartFutures); + stages.forEach(stage -> { + Map> newReadinessFutures = new HashMap<>(); + info.endpoints().forEach(endpoint -> { + newReadinessFutures.put(endpoint, stage.future); + }); + addReadinessFutures(stage.name, newReadinessFutures); + }); + return new EndpointReadyFutures(logContext, + endpointStages); + } + } + + static class EndpointCompletionStage { + final String name; + final CompletionStage future; + + EndpointCompletionStage(String name, CompletionStage future) { + this.name = name; + this.future = future; + } + } + + class EndpointReadyFuture { + final String endpointName; + final TreeSet incomplete; + final CompletableFuture future; + + EndpointReadyFuture(Endpoint endpoint, Collection stageNames) { + this.endpointName = endpoint.listenerName().orElse("UNNAMED"); + this.incomplete = new TreeSet<>(stageNames); + this.future = new CompletableFuture<>(); + } + + void completeStage(String stageName) { + boolean done = false; + synchronized (EndpointReadyFuture.this) { + if (incomplete.remove(stageName)) { + if (incomplete.isEmpty()) { + done = true; + } else { + log.info("{} completed for endpoint {}. Still waiting for {}.", + stageName, endpointName, incomplete); + } + } + } + if (done) { + if (future.complete(null)) { + log.info("{} completed for endpoint {}. Endpoint is now READY.", + stageName, endpointName); + } + } + } + + void failStage(String what, Throwable exception) { + if (future.completeExceptionally(exception)) { + synchronized (EndpointReadyFuture.this) { + incomplete.clear(); + } + log.warn("Endpoint {} will never become ready because we encountered an {} exception", + endpointName, what, exception); + } + } + } + + private final Logger log; + + private final Map> futures; + + private EndpointReadyFutures( + LogContext logContext, + Map> endpointStages + ) { + this.log = logContext.logger(EndpointReadyFutures.class); + Map> newFutures = new HashMap<>(); + endpointStages.forEach((endpoint, stages) -> { + List stageNames = new ArrayList<>(); + stages.forEach(stage -> stageNames.add(stage.name)); + EndpointReadyFuture readyFuture = new EndpointReadyFuture(endpoint, stageNames); + newFutures.put(endpoint, readyFuture.future); + stages.forEach(stage -> { + stage.future.whenComplete((__, exception) -> { + if (exception != null) { + readyFuture.failStage(stage.name, exception); + } else { + readyFuture.completeStage(stage.name); + } + }); + }); + }); + this.futures = Collections.unmodifiableMap(newFutures); + } + + public Map> futures() { + return futures; + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java b/server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java new file mode 100644 index 00000000000..1c81379e8e2 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.network; + +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.server.authorizer.AuthorizerServerInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + + +/** + * Runtime broker configuration metadata provided to authorizers during start up. + */ +public final class KafkaAuthorizerServerInfo implements AuthorizerServerInfo { + private final ClusterResource clusterResource; + private final int brokerId; + private final Collection endpoints; + private final Endpoint interbrokerEndpoint; + private final Collection earlyStartListeners; + + public KafkaAuthorizerServerInfo( + ClusterResource clusterResource, + int brokerId, + Collection endpoints, + Endpoint interbrokerEndpoint, + Collection earlyStartListeners + ) { + this.clusterResource = clusterResource; + this.brokerId = brokerId; + this.endpoints = Collections.unmodifiableCollection(new ArrayList<>(endpoints)); + this.interbrokerEndpoint = interbrokerEndpoint; + this.earlyStartListeners = Collections.unmodifiableCollection(new ArrayList<>(earlyStartListeners)); + } + + @Override + public ClusterResource clusterResource() { + return clusterResource; + } + + @Override + public int brokerId() { + return brokerId; + } + + @Override + public Collection endpoints() { + return endpoints; + } + + @Override + public Endpoint interBrokerEndpoint() { + return interbrokerEndpoint; + } + + @Override + public Collection earlyStartListeners() { + return earlyStartListeners; + } + + @Override + public boolean equals(Object o) { + if (o == null || (!(o.getClass().equals(KafkaAuthorizerServerInfo.class)))) return false; + KafkaAuthorizerServerInfo other = (KafkaAuthorizerServerInfo) o; + return clusterResource.equals(other.clusterResource) && + brokerId == other.brokerId && + endpoints.equals(other.endpoints) && + interbrokerEndpoint.equals(other.interbrokerEndpoint) && + earlyStartListeners.equals(other.earlyStartListeners); + } + + @Override + public int hashCode() { + return Objects.hash(clusterResource, + brokerId, + endpoints, + interbrokerEndpoint, + earlyStartListeners); + } + + @Override + public String toString() { + return "KafkaAuthorizerServerInfo(" + + "clusterResource=" + clusterResource + + ", brokerId=" + brokerId + + ", endpoints=" + endpoints + + ", earlyStartListeners=" + earlyStartListeners + + ")"; + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java new file mode 100644 index 00000000000..c3d8e0f0319 --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +class FeaturesTest { + @Test + public void testKRaftModeFeatures() { + Features features = new Features(MINIMUM_KRAFT_VERSION, + Collections.singletonMap("foo", (short) 2), 123, true); + assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), + features.finalizedFeatures().get(FEATURE_NAME)); + assertEquals((short) 2, + features.finalizedFeatures().get("foo")); + assertEquals(2, features.finalizedFeatures().size()); + } + + @Test + public void testZkModeFeatures() { + Features features = new Features(MINIMUM_KRAFT_VERSION, + Collections.singletonMap("foo", (short) 2), 123, false); + assertNull(features.finalizedFeatures().get(FEATURE_NAME)); + assertEquals((short) 2, + features.finalizedFeatures().get("foo")); + assertEquals(1, features.finalizedFeatures().size()); + } +} diff --git a/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java new file mode 100644 index 00000000000..2d76e5df37d --- /dev/null +++ b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.network; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +final public class EndpointReadyFuturesTest { + private static final Endpoint EXTERNAL = + new Endpoint("EXTERNAL", SecurityProtocol.SSL, "127.0.0.1", 9092); + + private static final Endpoint INTERNAL = + new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "127.0.0.1", 9093); + + private static final KafkaAuthorizerServerInfo INFO = new KafkaAuthorizerServerInfo( + new ClusterResource("S6-01LPiQOCBhhFIunQUcQ"), + 1, + Arrays.asList(EXTERNAL, INTERNAL), + INTERNAL, + Arrays.asList("INTERNAL")); + + static void assertComplete( + EndpointReadyFutures readyFutures, + Endpoint... endpoints + ) { + for (Endpoint endpoint : endpoints) { + String name = endpoint.listenerName().get(); + CompletableFuture future = readyFutures.futures().get(endpoint); + assertNotNull(future, "Unable to find future for " + name); + assertTrue(future.isDone(), "Future for " + name + " is not done."); + assertFalse(future.isCompletedExceptionally(), + "Future for " + name + " is completed exceptionally."); + } + } + + static void assertIncomplete( + EndpointReadyFutures readyFutures, + Endpoint... endpoints + ) { + for (Endpoint endpoint : endpoints) { + CompletableFuture future = readyFutures.futures().get(endpoint); + assertNotNull(future, "Unable to find future for " + endpoint); + assertFalse(future.isDone(), "Future for " + endpoint + " is done."); + } + } + + static void assertException( + EndpointReadyFutures readyFutures, + Throwable throwable, + Endpoint... endpoints + ) { + for (Endpoint endpoint : endpoints) { + CompletableFuture future = readyFutures.futures().get(endpoint); + assertNotNull(future, "Unable to find future for " + endpoint); + assertTrue(future.isCompletedExceptionally(), + "Future for " + endpoint + " is not completed exceptionally."); + Throwable cause = assertThrows(CompletionException.class, + () -> future.getNow(null)).getCause(); + assertNotNull(cause, "Unable to find CompletionException cause for " + endpoint); + assertEquals(throwable.getClass(), cause.getClass()); + assertEquals(throwable.getMessage(), cause.getMessage()); + } + } + + @Test + public void testImmediateCompletion() { + EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder(). + build(Optional.empty(), INFO); + assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)), + readyFutures.futures().keySet()); + assertComplete(readyFutures, EXTERNAL, INTERNAL); + } + + @Test + public void testAddReadinessFuture() { + CompletableFuture foo = new CompletableFuture<>(); + EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder(). + addReadinessFuture("foo", foo). + build(Optional.empty(), INFO); + assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)), + readyFutures.futures().keySet()); + assertIncomplete(readyFutures, EXTERNAL, INTERNAL); + foo.complete(null); + assertComplete(readyFutures, EXTERNAL, INTERNAL); + } + + @Test + public void testAddMultipleReadinessFutures() { + CompletableFuture foo = new CompletableFuture<>(); + CompletableFuture bar = new CompletableFuture<>(); + EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder(). + addReadinessFuture("foo", foo). + addReadinessFuture("bar", bar). + build(Optional.empty(), INFO); + assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)), + readyFutures.futures().keySet()); + assertIncomplete(readyFutures, EXTERNAL, INTERNAL); + foo.complete(null); + assertIncomplete(readyFutures, EXTERNAL, INTERNAL); + bar.complete(null); + assertComplete(readyFutures, EXTERNAL, INTERNAL); + } + + @Test + public void testAddReadinessFutures() { + Map> bazFutures = new HashMap<>(); + bazFutures.put(EXTERNAL, new CompletableFuture<>()); + bazFutures.put(INTERNAL, new CompletableFuture<>()); + EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder(). + addReadinessFutures("baz", bazFutures). + build(Optional.empty(), INFO); + assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)), + readyFutures.futures().keySet()); + assertIncomplete(readyFutures, EXTERNAL, INTERNAL); + bazFutures.get(EXTERNAL).complete(null); + assertComplete(readyFutures, EXTERNAL); + assertIncomplete(readyFutures, INTERNAL); + bazFutures.get(INTERNAL).complete(null); + assertComplete(readyFutures, EXTERNAL, INTERNAL); + } + + @Test + public void testFailedReadinessFuture() { + CompletableFuture foo = new CompletableFuture<>(); + CompletableFuture bar = new CompletableFuture<>(); + EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder(). + addReadinessFuture("foo", foo). + addReadinessFuture("bar", bar). + build(Optional.empty(), INFO); + assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)), + readyFutures.futures().keySet()); + assertIncomplete(readyFutures, EXTERNAL, INTERNAL); + foo.complete(null); + assertIncomplete(readyFutures, EXTERNAL, INTERNAL); + bar.completeExceptionally(new RuntimeException("Failed.")); + assertException(readyFutures, new RuntimeException("Failed."), + EXTERNAL, INTERNAL); + } +}