Browse Source

KAFKA-15060: fix the ApiVersionManager interface

This PR expands the scope of ApiVersionManager a bit to include returning the current
MetadataVersion and features that are in effect. This is useful in general because that information
needs to be returned in an ApiVersionsResponse. It also allows us to fix the ApiVersionManager
interface so that all subclasses implement all methods of the interface. Having subclasses that
don't implement some methods is dangerous because they could cause exceptions at runtime in
unexpected scenarios.

On the KRaft controller, we were previously performing a read operation in the QuorumController
thread to get the current metadata version and features. With this PR, we now read a volatile
variable maintained by a separate MetadataVersionContextPublisher object. This will improve
performance and simplify the code. It should not change the guarantees we are providing; in both
the old and new scenarios, we need to be robust against version skew scenarios during updates.

Add a Features class which just has a 3-tuple of metadata version, features, and feature epoch.
Remove MetadataCache.FinalizedFeaturesAndEpoch, since it just duplicates the Features class.
(There are some additional feature-related classes that can be consolidated in in a follow-on PR.)

Create a java class, EndpointReadyFutures, for managing the futures associated with individual
authorizer endpoints. This avoids code duplication between ControllerServer and BrokerServer and
makes this code unit-testable.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
pull/13877/head
Colin P. McCabe 1 year ago
parent
commit
cd3c0ab1a3
  1. 4
      checkstyle/import-control-server-common.xml
  2. 8
      core/src/main/scala/kafka/controller/KafkaController.scala
  3. 55
      core/src/main/scala/kafka/server/ApiVersionManager.scala
  4. 28
      core/src/main/scala/kafka/server/BrokerServer.scala
  5. 15
      core/src/main/scala/kafka/server/ControllerApis.scala
  6. 31
      core/src/main/scala/kafka/server/ControllerServer.scala
  7. 13
      core/src/main/scala/kafka/server/MetadataCache.scala
  8. 18
      core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
  9. 47
      core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
  10. 2
      core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
  11. 7
      core/src/main/scala/kafka/tools/TestRaftServer.scala
  12. 6
      core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  13. 12
      core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
  14. 10
      core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
  15. 20
      core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
  16. 21
      core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
  17. 10
      core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  18. 8
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
  19. 7
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
  20. 54
      metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
  21. 87
      server-common/src/main/java/org/apache/kafka/server/common/Features.java
  22. 219
      server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
  23. 108
      server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java
  24. 50
      server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
  25. 169
      server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java

4
checkstyle/import-control-server-common.xml

@ -77,6 +77,10 @@ @@ -77,6 +77,10 @@
<subpackage name="metrics">
<allow pkg="com.yammer.metrics" />
</subpackage>
<subpackage name="network">
<allow pkg="org.apache.kafka.server.authorizer" />
</subpackage>
</subpackage>
</import-control>

8
core/src/main/scala/kafka/controller/KafkaController.scala

@ -1598,7 +1598,9 @@ class KafkaController(val config: KafkaConfig, @@ -1598,7 +1598,9 @@ class KafkaController(val config: KafkaConfig,
!config.isFeatureVersioningSupported ||
!featureCache.getFeatureOption.exists(
latestFinalizedFeatures =>
BrokerFeatures.hasIncompatibleFeatures(broker.features, latestFinalizedFeatures.features))
BrokerFeatures.hasIncompatibleFeatures(broker.features,
latestFinalizedFeatures.finalizedFeatures().asScala.
map(kv => (kv._1, kv._2.toShort)).toMap))
}
}
@ -2081,8 +2083,8 @@ class KafkaController(val config: KafkaConfig, @@ -2081,8 +2083,8 @@ class KafkaController(val config: KafkaConfig,
callback: UpdateFeaturesCallback): Unit = {
val updates = request.featureUpdates
val existingFeatures = featureCache.getFeatureOption
.map(featuresAndEpoch => featuresAndEpoch.features)
.getOrElse(Map[String, Short]())
.map(featuresAndEpoch => featuresAndEpoch.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort)).toMap)
.getOrElse(Map[String, Short]())
// A map with key being feature name and value being finalized version.
// This contains the target features to be eventually written to FeatureZNode.
val targetFeatures = scala.collection.mutable.Map[String, Short]() ++ existingFeatures

55
core/src/main/scala/kafka/server/ApiVersionManager.scala

@ -18,10 +18,11 @@ package kafka.server @@ -18,10 +18,11 @@ package kafka.server
import kafka.network
import kafka.network.RequestChannel
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.server.common.Features
import scala.jdk.CollectionConverters._
@ -30,22 +31,14 @@ trait ApiVersionManager { @@ -30,22 +31,14 @@ trait ApiVersionManager {
def listenerType: ListenerType
def enabledApis: collection.Set[ApiKeys]
/**
* @see [[DefaultApiVersionManager.apiVersionResponse]]
* @see [[kafka.server.KafkaApis.handleApiVersionsRequest]]
*/
def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
/**
* @see [[SimpleApiVersionManager.apiVersionResponse]]
* @see [[kafka.server.ControllerApis.handleApiVersionsRequest]]
*/
def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): ApiVersionsResponse
def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
}
def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
def features: Features
}
object ApiVersionManager {
@ -80,41 +73,43 @@ object ApiVersionManager { @@ -80,41 +73,43 @@ object ApiVersionManager {
class SimpleApiVersionManager(
val listenerType: ListenerType,
val enabledApis: collection.Set[ApiKeys],
brokerFeatures: Features[SupportedVersionRange],
brokerFeatures: org.apache.kafka.common.feature.Features[SupportedVersionRange],
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean
val zkMigrationEnabled: Boolean,
val featuresProvider: () => Features
) extends ApiVersionManager {
def this(
listenerType: ListenerType,
enableUnstableLastVersion: Boolean,
zkMigrationEnabled: Boolean
zkMigrationEnabled: Boolean,
featuresProvider: () => Features
) = {
this(
listenerType,
ApiKeys.apisForListener(listenerType).asScala,
BrokerFeatures.defaultSupportedFeatures(),
enableUnstableLastVersion,
zkMigrationEnabled
zkMigrationEnabled,
featuresProvider
)
}
private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
throw new UnsupportedOperationException("This method is not supported in SimpleApiVersionManager, use apiVersionResponse(throttleTimeMs, finalizedFeatures, epoch) instead")
}
override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): ApiVersionsResponse = {
override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
val currentFeatures = features
ApiVersionsResponse.createApiVersionsResponse(
throttleTimeMs,
apiVersions,
brokerFeatures,
finalizedFeatures.asJava,
finalizedFeaturesEpoch,
currentFeatures.finalizedFeatures(),
currentFeatures.finalizedFeaturesEpoch(),
zkMigrationEnabled
)
}
override def features: Features = featuresProvider.apply()
}
/**
@ -124,7 +119,7 @@ class SimpleApiVersionManager( @@ -124,7 +119,7 @@ class SimpleApiVersionManager(
*
* @param listenerType the listener type
* @param forwardingManager the forwarding manager,
* @param features the broker features
* @param brokerFeatures the broker features
* @param metadataCache the metadata cache, used to get the finalized features and the metadata version
* @param enableUnstableLastVersion whether to enable unstable last version, see [[KafkaConfig.unstableApiVersionsEnabled]]
* @param zkMigrationEnabled whether to enable zk migration, see [[KafkaConfig.migrationEnabled]]
@ -132,7 +127,7 @@ class SimpleApiVersionManager( @@ -132,7 +127,7 @@ class SimpleApiVersionManager(
class DefaultApiVersionManager(
val listenerType: ListenerType,
forwardingManager: Option[ForwardingManager],
features: BrokerFeatures,
brokerFeatures: BrokerFeatures,
metadataCache: MetadataCache,
val enableUnstableLastVersion: Boolean,
val zkMigrationEnabled: Boolean = false
@ -141,16 +136,16 @@ class DefaultApiVersionManager( @@ -141,16 +136,16 @@ class DefaultApiVersionManager(
val enabledApis = ApiKeys.apisForListener(listenerType).asScala
override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
val supportedFeatures = features.supportedFeatures
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeatures = metadataCache.features()
val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
ApiVersionsResponse.createApiVersionsResponse(
throttleTimeMs,
metadataCache.metadataVersion().highestSupportedRecordVersion,
finalizedFeatures.metadataVersion().highestSupportedRecordVersion,
supportedFeatures,
finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
finalizedFeatures.epoch,
finalizedFeatures.finalizedFeatures(),
finalizedFeatures.finalizedFeaturesEpoch(),
controllerApiVersions.orNull,
listenerType,
enableUnstableLastVersion,
@ -158,7 +153,5 @@ class DefaultApiVersionManager( @@ -158,7 +153,5 @@ class DefaultApiVersionManager(
)
}
override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse = {
throw new UnsupportedOperationException("This method is not supported in DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")
}
override def features: Features = metadataCache.features()
}

28
core/src/main/scala/kafka/server/BrokerServer.scala

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package kafka.server
import kafka.cluster.Broker.ServerInfo
import kafka.cluster.EndPoint
import kafka.coordinator.group.GroupCoordinatorAdapter
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
@ -48,6 +47,7 @@ import org.apache.kafka.server.authorizer.Authorizer @@ -48,6 +47,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
@ -366,25 +366,10 @@ class BrokerServer( @@ -366,25 +366,10 @@ class BrokerServer(
config.interBrokerListenerName.value() + ". Found listener(s): " +
endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", "))
}
val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
config.nodeId,
endpoints,
interBrokerListener,
config.earlyStartListeners.map(_.value()).asJava)
// Create and initialize an authorizer if one is configured.
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
authZ.start(authorizerInfo).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}
case None =>
authorizerInfo.endpoints.asScala.map { ep =>
ep -> CompletableFuture.completedFuture[Void](null)
}.toMap
}
val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
@ -504,6 +489,17 @@ class BrokerServer( @@ -504,6 +489,17 @@ class BrokerServer(
// Enable inbound TCP connections. Each endpoint will be started only once its matching
// authorizer future is completed.
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.asJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
endpoints,
interBrokerListener,
config.earlyStartListeners.map(_.value()).asJava))
}
val authorizerFutures = endpointReadyFutures.futures().asScala.toMap
val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures)
// Block here until all the authorizer futures are complete.

15
core/src/main/scala/kafka/server/ControllerApis.scala

@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel, @@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel,
if (apiVersionRequest.hasUnsupportedRequestVersion) {
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!apiVersionRequest.isValid) {
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
controller.finalizedFeatures(context).handle { (result, exception) =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
if (exception != null) {
apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
} else {
apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch())
}
})
}
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => apiVersionManager.apiVersionResponse(requestThrottleMs))
}
CompletableFuture.completedFuture[Unit](())
}
def authorizeAlterResource(requestContext: RequestContext,

31
core/src/main/scala/kafka/server/ControllerServer.scala

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package kafka.server
import kafka.cluster.Broker.ServerInfo
import kafka.metrics.LinuxIoMetricsCollector
import kafka.migration.MigrationPropagator
import kafka.network.{DataPlaneAcceptor, SocketServer}
@ -43,10 +42,12 @@ import org.apache.kafka.metadata.KafkaConfigSchema @@ -43,10 +42,12 @@ import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
import org.apache.kafka.metadata.publisher.FeaturesPublisher
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}
@ -115,6 +116,7 @@ class ControllerServer( @@ -115,6 +116,7 @@ class ControllerServer(
var migrationSupport: Option[ControllerMigrationSupport] = None
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
val metadataPublishers: util.List[MetadataPublisher] = new util.ArrayList[MetadataPublisher]()
val featuresPublisher = new FeaturesPublisher()
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
@ -153,30 +155,22 @@ class ControllerServer( @@ -153,30 +155,22 @@ class ControllerServer(
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
// It would be nice to remove some of the broker-specific assumptions from
// AuthorizerServerInfo, such as the assumption that there is an inter-broker
// listener, or that ID is named brokerId.
val controllerAuthorizerInfo = ServerInfo(
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.asJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
javaListeners,
javaListeners.get(0),
config.earlyStartListeners.map(_.value()).asJava)
authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}.toMap
case None =>
javaListeners.asScala.map {
ep => ep -> CompletableFuture.completedFuture[Void](null)
}.toMap
config.earlyStartListeners.map(_.value()).asJava))
}
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled,
config.migrationEnabled
config.migrationEnabled,
() => featuresPublisher.features()
)
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
@ -302,6 +296,8 @@ class ControllerServer( @@ -302,6 +296,8 @@ class ControllerServer(
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix)
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = endpointReadyFutures.futures().asScala.toMap
/**
* Enable the controller endpoint(s). If we are using an authorizer which stores
* ACLs in the metadata log, such as StandardAuthorizer, we will be able to start
@ -326,6 +322,9 @@ class ControllerServer( @@ -326,6 +322,9 @@ class ControllerServer(
// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)
// Set up the metadata features publisher.
metadataPublishers.add(featuresPublisher)
// Set up the dynamic config publisher. This runs even in combined mode, since the broker
// has its own separate dynamic configuration object.
metadataPublishers.add(new DynamicConfigPublisher(

13
core/src/main/scala/kafka/server/MetadataCache.scala

@ -22,16 +22,10 @@ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} @@ -22,16 +22,10 @@ import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}
import java.util
case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long) {
override def toString(): String = {
s"FinalizedFeaturesAndEpoch(features=$features, epoch=$epoch)"
}
}
/**
* Used to represent the controller id cached in the metadata cache of the broker. This trait is
* extended to represent if the controller is KRaft controller or Zk controller.
@ -44,7 +38,6 @@ case class ZkCachedControllerId(id: Int) extends CachedControllerId @@ -44,7 +38,6 @@ case class ZkCachedControllerId(id: Int) extends CachedControllerId
case class KRaftCachedControllerId(id: Int) extends CachedControllerId
trait MetadataCache {
/**
* Return topic metadata for a given set of topics and listener. See KafkaApis#handleTopicMetadataRequest for details
* on the use of the two boolean flags.
@ -113,9 +106,9 @@ trait MetadataCache { @@ -113,9 +106,9 @@ trait MetadataCache {
def metadataVersion(): MetadataVersion
def features(): FinalizedFeaturesAndEpoch
def getRandomAliveBrokerId: Option[Int]
def features(): Features
}
object MetadataCache {

18
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala

@ -18,7 +18,7 @@ @@ -18,7 +18,7 @@
package kafka.server.metadata
import kafka.controller.StateChangeLogger
import kafka.server.{CachedControllerId, FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache}
import kafka.server.{CachedControllerId, KRaftCachedControllerId, MetadataCache}
import kafka.utils.Logging
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
@ -37,7 +37,7 @@ import org.apache.kafka.common.config.ConfigResource @@ -37,7 +37,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData}
import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData}
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}
import scala.collection.{Seq, Set, mutable}
import scala.jdk.CollectionConverters._
@ -393,15 +393,11 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w @@ -393,15 +393,11 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
override def metadataVersion(): MetadataVersion = _currentImage.features().metadataVersion()
override def features(): FinalizedFeaturesAndEpoch = {
override def features(): Features = {
val image = _currentImage
val features = image.features().finalizedVersions().asScala.map {
case (name: String, level: java.lang.Short) => name -> Short2short(level)
}
features.put(MetadataVersion.FEATURE_NAME, image.features().metadataVersion().featureLevel())
FinalizedFeaturesAndEpoch(
features.toMap,
image.highestOffsetAndEpoch().offset)
new Features(image.features().metadataVersion(),
image.features().finalizedVersions(),
image.highestOffsetAndEpoch().offset,
true)
}
}

47
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala

@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._ @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.api._
import kafka.controller.StateChangeLogger
import kafka.server.{BrokerFeatures, CachedControllerId, FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
import kafka.server.{BrokerFeatures, CachedControllerId, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import kafka.utils.Implicits._
@ -40,7 +40,7 @@ import org.apache.kafka.common.network.ListenerName @@ -40,7 +40,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
import scala.concurrent.TimeoutException
@ -53,7 +53,7 @@ class FeatureCacheUpdateException(message: String) extends RuntimeException(mess @@ -53,7 +53,7 @@ class FeatureCacheUpdateException(message: String) extends RuntimeException(mess
trait ZkFinalizedFeatureCache {
def waitUntilFeatureEpochOrThrow(minExpectedEpoch: Long, timeoutMs: Long): Unit
def getFeatureOption: Option[FinalizedFeaturesAndEpoch]
def getFeatureOption: Option[Features]
}
/**
@ -83,7 +83,7 @@ class ZkMetadataCache( @@ -83,7 +83,7 @@ class ZkMetadataCache(
private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
// Features are updated via ZK notification (see FinalizedFeatureChangeListener)
@volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty
@volatile private var _features: Option[Features] = Option.empty
private val featureLock = new ReentrantLock()
private val featureCond = featureLock.newCondition()
@ -488,11 +488,12 @@ class ZkMetadataCache( @@ -488,11 +488,12 @@ class ZkMetadataCache(
override def metadataVersion(): MetadataVersion = metadataVersion
override def features(): FinalizedFeaturesAndEpoch = {
featuresAndEpoch match {
case Some(features) => features
case None => FinalizedFeaturesAndEpoch(Map.empty, ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH)
}
override def features(): Features = _features match {
case Some(features) => features
case None => new Features(metadataVersion,
Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
false)
}
/**
@ -509,14 +510,18 @@ class ZkMetadataCache( @@ -509,14 +510,18 @@ class ZkMetadataCache(
* not modified.
*/
def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = {
val latest = FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)
val existing = featuresAndEpoch.map(item => item.toString()).getOrElse("<empty>")
if (featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch > latest.epoch) {
val latest = new Features(metadataVersion,
latestFeatures.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
latestEpoch,
false)
val existing = _features
if (existing.isDefined && existing.get.finalizedFeaturesEpoch() > latest.finalizedFeaturesEpoch()) {
val errorMsg = s"FinalizedFeatureCache update failed due to invalid epoch in new $latest." +
s" The existing cache contents are $existing."
throw new FeatureCacheUpdateException(errorMsg)
} else {
val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features)
val incompatibleFeatures = brokerFeatures.incompatibleFeatures(
latest.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort)).toMap)
if (incompatibleFeatures.nonEmpty) {
val errorMsg = "FinalizedFeatureCache update failed since feature compatibility" +
s" checks failed! Supported ${brokerFeatures.supportedFeatures} has incompatibilities" +
@ -525,7 +530,7 @@ class ZkMetadataCache( @@ -525,7 +530,7 @@ class ZkMetadataCache(
} else {
val logMsg = s"Updated cache from existing $existing to latest $latest."
inLock(featureLock) {
featuresAndEpoch = Some(latest)
_features = Some(latest)
featureCond.signalAll()
}
info(logMsg)
@ -533,13 +538,12 @@ class ZkMetadataCache( @@ -533,13 +538,12 @@ class ZkMetadataCache(
}
}
/**
* Clears all existing finalized features and epoch from the cache.
*/
def clearFeatures(): Unit = {
inLock(featureLock) {
featuresAndEpoch = None
_features = None
featureCond.signalAll()
}
}
@ -565,12 +569,12 @@ class ZkMetadataCache( @@ -565,12 +569,12 @@ class ZkMetadataCache(
}
val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1000000)
inLock(featureLock) {
while (!(featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch >= minExpectedEpoch)) {
while (!(_features.isDefined && _features.get.finalizedFeaturesEpoch() >= minExpectedEpoch)) {
val nowNanos = System.nanoTime()
if (nowNanos > waitEndTimeNanos) {
throw new TimeoutException(
s"Timed out after waiting for ${timeoutMs}ms for required condition to be met." +
s" Current epoch: ${featuresAndEpoch.map(fe => fe.epoch).getOrElse("<none>")}.")
s" Current epoch: ${_features.map(fe => fe.finalizedFeaturesEpoch()).getOrElse("<none>")}.")
}
val sleepTimeMs = max(1L, (waitEndTimeNanos - nowNanos) / 1000000)
featureCond.await(sleepTimeMs, TimeUnit.MILLISECONDS)
@ -578,10 +582,5 @@ class ZkMetadataCache( @@ -578,10 +582,5 @@ class ZkMetadataCache(
}
}
/**
* @return the latest known FinalizedFeaturesAndEpoch or empty if not defined in the cache.
*/
def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = {
featuresAndEpoch
}
override def getFeatureOption: Option[Features] = _features
}

2
core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala

@ -64,7 +64,7 @@ class TestRaftRequestHandler( @@ -64,7 +64,7 @@ class TestRaftRequestHandler(
}
private def handleApiVersions(request: RequestChannel.Request): Unit = {
requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0, Map.empty, -1), None)
requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0), None)
}
private def handleVote(request: RequestChannel.Request): Unit = {

7
core/src/main/scala/kafka/tools/TestRaftServer.scala

@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{Time, Utils} @@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.fault.ProcessTerminatingFaultHandler
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils, ShutdownableThread}
@ -74,7 +75,11 @@ class TestRaftServer( @@ -74,7 +75,11 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
true,
false,
() => Features.fromKRaftVersion(MetadataVersion.MINIMUM_KRAFT_VERSION))
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
val metaProperties = MetaProperties(

6
core/src/test/scala/unit/kafka/network/SocketServerTest.scala

@ -24,7 +24,7 @@ import java.nio.channels.{SelectionKey, SocketChannel} @@ -24,7 +24,7 @@ import java.nio.channels.{SelectionKey, SocketChannel}
import java.nio.charset.StandardCharsets
import java.util
import java.util.concurrent.{CompletableFuture, ConcurrentLinkedQueue, ExecutionException, Executors, TimeUnit}
import java.util.{Properties, Random}
import java.util.{Collections, Properties, Random}
import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, TextNode}
import com.yammer.metrics.core.{Gauge, Meter}
@ -46,6 +46,7 @@ import org.apache.kafka.common.requests._ @@ -46,6 +46,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, MockTime, Time, Utils}
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.log4j.Level
import org.junit.jupiter.api.Assertions._
@ -77,7 +78,8 @@ class SocketServerTest { @@ -77,7 +78,8 @@ class SocketServerTest {
// Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics()
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false)
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, false,
() => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, true))
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket]

12
core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala

@ -39,7 +39,7 @@ class ApiVersionManagerTest { @@ -39,7 +39,7 @@ class ApiVersionManagerTest {
val versionManager = new DefaultApiVersionManager(
listenerType = apiScope,
forwardingManager = None,
features = brokerFeatures,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
)
@ -57,7 +57,7 @@ class ApiVersionManagerTest { @@ -57,7 +57,7 @@ class ApiVersionManagerTest {
val versionManager = new DefaultApiVersionManager(
listenerType = apiScope,
forwardingManager = None,
features = brokerFeatures,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = false
)
@ -86,7 +86,7 @@ class ApiVersionManagerTest { @@ -86,7 +86,7 @@ class ApiVersionManagerTest {
val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.ZK_BROKER,
forwardingManager = Some(forwardingManager),
features = brokerFeatures,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
)
@ -107,7 +107,7 @@ class ApiVersionManagerTest { @@ -107,7 +107,7 @@ class ApiVersionManagerTest {
val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.BROKER,
forwardingManager = forwardingManagerOpt,
features = brokerFeatures,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
)
@ -129,7 +129,7 @@ class ApiVersionManagerTest { @@ -129,7 +129,7 @@ class ApiVersionManagerTest {
val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.ZK_BROKER,
forwardingManager = Some(forwardingManager),
features = brokerFeatures,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
)
@ -148,7 +148,7 @@ class ApiVersionManagerTest { @@ -148,7 +148,7 @@ class ApiVersionManagerTest {
val versionManager = new DefaultApiVersionManager(
listenerType = ListenerType.ZK_BROKER,
forwardingManager = None,
features = brokerFeatures,
brokerFeatures = brokerFeatures,
metadataCache = metadataCache,
enableUnstableLastVersion = true
)

10
core/src/test/scala/unit/kafka/server/ControllerApisTest.scala

@ -46,12 +46,12 @@ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} @@ -46,12 +46,12 @@ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.{ElectionType, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.controller.{Controller, ControllerRequestContext, ResultOrError}
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion, ProducerIdsBlock}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@ -155,7 +155,11 @@ class ControllerApisTest { @@ -155,7 +155,11 @@ class ControllerApisTest {
new KafkaConfig(props),
MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
Seq.empty,
new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
new SimpleApiVersionManager(
ListenerType.CONTROLLER,
true,
false,
() => Features.fromKRaftVersion(MetadataVersion.latest()))
)
}

20
core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala

@ -32,6 +32,10 @@ class FinalizedFeatureCacheTest { @@ -32,6 +32,10 @@ class FinalizedFeatureCacheTest {
assertTrue(new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, BrokerFeatures.createDefault()).getFeatureOption.isEmpty)
}
def asJava(input: Map[String, Short]): java.util.Map[String, java.lang.Short] = {
input.map(kv => kv._1 -> kv._2.asInstanceOf[java.lang.Short]).asJava
}
@Test
def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = {
val supportedFeatures = Map[String, SupportedVersionRange](
@ -44,15 +48,15 @@ class FinalizedFeatureCacheTest { @@ -44,15 +48,15 @@ class FinalizedFeatureCacheTest {
val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, brokerFeatures)
cache.updateFeaturesOrThrow(finalizedFeatures, 10)
assertTrue(cache.getFeatureOption.isDefined)
assertEquals(finalizedFeatures, cache.getFeatureOption.get.features)
assertEquals(10, cache.getFeatureOption.get.epoch)
assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures())
assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch())
assertThrows(classOf[FeatureCacheUpdateException], () => cache.updateFeaturesOrThrow(finalizedFeatures, 9))
// Check that the failed updateOrThrow call did not make any mutations.
assertTrue(cache.getFeatureOption.isDefined)
assertEquals(finalizedFeatures, cache.getFeatureOption.get.features)
assertEquals(10, cache.getFeatureOption.get.epoch)
assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures())
assertEquals(10, cache.getFeatureOption.get.finalizedFeaturesEpoch())
}
@Test
@ -83,8 +87,8 @@ class FinalizedFeatureCacheTest { @@ -83,8 +87,8 @@ class FinalizedFeatureCacheTest {
val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, brokerFeatures)
cache.updateFeaturesOrThrow(finalizedFeatures, 12)
assertTrue(cache.getFeatureOption.isDefined)
assertEquals(finalizedFeatures, cache.getFeatureOption.get.features)
assertEquals(12, cache.getFeatureOption.get.epoch)
assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures())
assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch())
}
@Test
@ -99,8 +103,8 @@ class FinalizedFeatureCacheTest { @@ -99,8 +103,8 @@ class FinalizedFeatureCacheTest {
val cache = new ZkMetadataCache(1, MetadataVersion.IBP_2_8_IV1, brokerFeatures)
cache.updateFeaturesOrThrow(finalizedFeatures, 12)
assertTrue(cache.getFeatureOption.isDefined)
assertEquals(finalizedFeatures, cache.getFeatureOption.get.features)
assertEquals(12, cache.getFeatureOption.get.epoch)
assertEquals(asJava(finalizedFeatures), cache.getFeatureOption.get.finalizedFeatures())
assertEquals(12, cache.getFeatureOption.get.finalizedFeaturesEpoch())
cache.clearFeatures()
assertTrue(cache.getFeatureOption.isEmpty)

21
core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala

@ -21,6 +21,7 @@ import kafka.server.metadata.ZkMetadataCache @@ -21,6 +21,7 @@ import kafka.server.metadata.ZkMetadataCache
import kafka.utils.TestUtils
import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.{Features => JFeatures}
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0
@ -32,6 +33,15 @@ import java.util.concurrent.{CountDownLatch, TimeoutException} @@ -32,6 +33,15 @@ import java.util.concurrent.{CountDownLatch, TimeoutException}
import scala.jdk.CollectionConverters._
class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long) {
override def toString(): String = {
s"FinalizedFeaturesAndEpoch(features=$features, epoch=$epoch)"
}
}
def asJava(input: Map[String, Short]): java.util.Map[String, java.lang.Short] = {
input.map(kv => kv._1 -> kv._2.asInstanceOf[java.lang.Short]).asJava
}
private def createBrokerFeatures(): BrokerFeatures = {
val supportedFeaturesMap = Map[String, SupportedVersionRange](
@ -64,8 +74,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness { @@ -64,8 +74,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
val mayBeNewCacheContent = cache.getFeatureOption
assertFalse(mayBeNewCacheContent.isEmpty)
val newCacheContent = mayBeNewCacheContent.get
assertEquals(expectedCacheContent.get.features, newCacheContent.features)
assertEquals(expectedCacheContent.get.epoch, newCacheContent.epoch)
assertEquals(asJava(expectedCacheContent.get.features), newCacheContent.finalizedFeatures())
assertEquals(expectedCacheContent.get.epoch, newCacheContent.finalizedFeaturesEpoch())
} else {
val mayBeNewCacheContent = cache.getFeatureOption
assertTrue(mayBeNewCacheContent.isEmpty)
@ -94,7 +104,9 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness { @@ -94,7 +104,9 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
assertTrue(updatedVersion > initialFinalizedFeatures.epoch)
cache.waitUntilFeatureEpochOrThrow(updatedVersion, JTestUtils.DEFAULT_MAX_WAIT_MS)
assertEquals(FinalizedFeaturesAndEpoch(finalizedFeatures, updatedVersion), cache.getFeatureOption.get)
assertEquals(new JFeatures(MetadataVersion.IBP_2_8_IV1,
asJava(finalizedFeatures), updatedVersion, false),
cache.getFeatureOption.get)
assertTrue(listener.isListenerInitiated)
}
@ -248,7 +260,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness { @@ -248,7 +260,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
listener.isListenerDead &&
// Make sure the cache contents are as expected, and, the incompatible features were not
// applied.
cache.getFeatureOption.get.equals(initialFinalizedFeatures)
cache.getFeatureOption.get.equals(new JFeatures(MetadataVersion.IBP_2_8_IV1,
asJava(initialFinalizedFeatures.features), initialFinalizedFeatures.epoch, false))
}, "Timed out waiting for listener death and FinalizedFeatureCache to be updated")
} finally {
Exit.resetExitProcedure()

10
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

@ -93,7 +93,7 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti @@ -93,7 +93,7 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData}
@ -185,7 +185,13 @@ class KafkaApisTest { @@ -185,7 +185,13 @@ class KafkaApisTest {
} else {
ApiKeys.apisForListener(listenerType).asScala.toSet
}
val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false)
val apiVersionManager = new SimpleApiVersionManager(
listenerType,
enabledApis,
BrokerFeatures.defaultSupportedFeatures(),
true,
false,
() => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
new KafkaApis(
requestChannel = requestChannel,

8
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java

@ -59,6 +59,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinator; @@ -59,6 +59,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -197,7 +199,11 @@ public class KRaftMetadataRequestBenchmark { @@ -197,7 +199,11 @@ public class KRaftMetadataRequestBenchmark {
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false)).
setApiVersionManager(new SimpleApiVersionManager(
ApiMessageType.ListenerType.BROKER,
false,
false,
() -> Features.fromKRaftVersion(MetadataVersion.latest()))).
build();
}

7
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java

@ -60,6 +60,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; @@ -60,6 +60,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
@ -199,7 +200,11 @@ public class MetadataRequestBenchmark { @@ -199,7 +200,11 @@ public class MetadataRequestBenchmark {
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false, false)).
setApiVersionManager(new SimpleApiVersionManager(
ApiMessageType.ListenerType.ZK_BROKER,
false,
false,
() -> Features.fromKRaftVersion(MetadataVersion.latest()))).
build();
}

54
metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.publisher;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.server.common.Features;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
public class FeaturesPublisher implements MetadataPublisher {
private volatile Features features = Features.fromKRaftVersion(MINIMUM_KRAFT_VERSION);
public Features features() {
return features;
}
@Override
public String name() {
return "FeaturesPublisher";
}
@Override
public void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage,
LoaderManifest manifest
) {
if (delta.featuresDelta() != null) {
features = new Features(newImage.features().metadataVersion(),
newImage.features().finalizedVersions(),
newImage.provenance().lastContainedOffset(),
true);
}
}
}

87
server-common/src/main/java/org/apache/kafka/server/common/Features.java

@ -0,0 +1,87 @@ @@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
public final class Features {
private final MetadataVersion version;
private final Map<String, Short> finalizedFeatures;
private final long finalizedFeaturesEpoch;
public static Features fromKRaftVersion(MetadataVersion version) {
return new Features(version, Collections.emptyMap(), -1, true);
}
public Features(
MetadataVersion version,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch,
boolean kraftMode
) {
this.version = version;
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
// In KRaft mode, we always include the metadata version in the features map.
// In ZK mode, we never include it.
if (kraftMode) {
this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
} else {
this.finalizedFeatures.remove(FEATURE_NAME);
}
}
public MetadataVersion metadataVersion() {
return version;
}
public Map<String, Short> finalizedFeatures() {
return finalizedFeatures;
}
public long finalizedFeaturesEpoch() {
return finalizedFeaturesEpoch;
}
@Override
public boolean equals(Object o) {
if (o == null || !(o.getClass().equals(Features.class))) return false;
Features other = (Features) o;
return version == other.version &&
finalizedFeatures.equals(other.finalizedFeatures) &&
finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
}
@Override
public int hashCode() {
return Objects.hash(version, finalizedFeatures, finalizedFeaturesEpoch);
}
@Override
public String toString() {
return "Features" +
"(version=" + version +
", finalizedFeatures=" + finalizedFeatures +
", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
")";
}
}

219
server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java

@ -0,0 +1,219 @@ @@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.network;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
* Manages a set of per-endpoint futures.
*/
public class EndpointReadyFutures {
public static class Builder {
private LogContext logContext = null;
private final Map<Endpoint, List<EndpointCompletionStage>> endpointStages = new HashMap<>();
private final List<EndpointCompletionStage> stages = new ArrayList<>();
/**
* Add a readiness future that will block all endpoints.
*
* @param name The future name.
* @param future The future object.
*
* @return This builder object.
*/
public Builder addReadinessFuture(
String name,
CompletableFuture<?> future
) {
stages.add(new EndpointCompletionStage(name, future));
return this;
}
/**
* Add readiness futures for individual endpoints.
*
* @param name The future name.
* @param newFutures A map from endpoints to futures.
*
* @return This builder object.
*/
public Builder addReadinessFutures(
String name,
Map<Endpoint, ? extends CompletionStage<?>> newFutures
) {
newFutures.forEach((endpoint, future) -> {
endpointStages.computeIfAbsent(endpoint, __ -> new ArrayList<>()).
add(new EndpointCompletionStage(name, future));
});
return this;
}
/**
* Build the EndpointReadyFutures object.
*
* @param authorizer The authorizer to use, if any. Will be started.
* @param info Server information to be passed to the authorizer.
*
* @return The new futures object.
*/
public EndpointReadyFutures build(
Optional<Authorizer> authorizer,
AuthorizerServerInfo info
) {
if (authorizer.isPresent()) {
return build(authorizer.get().start(info), info);
} else {
return build(Collections.emptyMap(), info);
}
}
EndpointReadyFutures build(
Map<Endpoint, ? extends CompletionStage<?>> authorizerStartFutures,
AuthorizerServerInfo info
) {
if (logContext == null) logContext = new LogContext();
Map<Endpoint, CompletionStage<?>> effectiveStartFutures =
new HashMap<>(authorizerStartFutures);
for (Endpoint endpoint : info.endpoints()) {
if (!effectiveStartFutures.containsKey(endpoint)) {
CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
effectiveStartFutures.put(endpoint, completedFuture);
}
}
if (info.endpoints().size() != effectiveStartFutures.size()) {
List<String> notInInfo = new ArrayList<>();
for (Endpoint endpoint : effectiveStartFutures.keySet()) {
if (!info.endpoints().contains(endpoint)) {
notInInfo.add(endpoint.listenerName().orElse("[none]"));
}
}
throw new RuntimeException("Found authorizer futures that weren't included " +
"in AuthorizerServerInfo: " + notInInfo);
}
addReadinessFutures("authorizerStart", effectiveStartFutures);
stages.forEach(stage -> {
Map<Endpoint, CompletionStage<?>> newReadinessFutures = new HashMap<>();
info.endpoints().forEach(endpoint -> {
newReadinessFutures.put(endpoint, stage.future);
});
addReadinessFutures(stage.name, newReadinessFutures);
});
return new EndpointReadyFutures(logContext,
endpointStages);
}
}
static class EndpointCompletionStage {
final String name;
final CompletionStage<?> future;
EndpointCompletionStage(String name, CompletionStage<?> future) {
this.name = name;
this.future = future;
}
}
class EndpointReadyFuture {
final String endpointName;
final TreeSet<String> incomplete;
final CompletableFuture<Void> future;
EndpointReadyFuture(Endpoint endpoint, Collection<String> stageNames) {
this.endpointName = endpoint.listenerName().orElse("UNNAMED");
this.incomplete = new TreeSet<>(stageNames);
this.future = new CompletableFuture<>();
}
void completeStage(String stageName) {
boolean done = false;
synchronized (EndpointReadyFuture.this) {
if (incomplete.remove(stageName)) {
if (incomplete.isEmpty()) {
done = true;
} else {
log.info("{} completed for endpoint {}. Still waiting for {}.",
stageName, endpointName, incomplete);
}
}
}
if (done) {
if (future.complete(null)) {
log.info("{} completed for endpoint {}. Endpoint is now READY.",
stageName, endpointName);
}
}
}
void failStage(String what, Throwable exception) {
if (future.completeExceptionally(exception)) {
synchronized (EndpointReadyFuture.this) {
incomplete.clear();
}
log.warn("Endpoint {} will never become ready because we encountered an {} exception",
endpointName, what, exception);
}
}
}
private final Logger log;
private final Map<Endpoint, CompletableFuture<Void>> futures;
private EndpointReadyFutures(
LogContext logContext,
Map<Endpoint, List<EndpointCompletionStage>> endpointStages
) {
this.log = logContext.logger(EndpointReadyFutures.class);
Map<Endpoint, CompletableFuture<Void>> newFutures = new HashMap<>();
endpointStages.forEach((endpoint, stages) -> {
List<String> stageNames = new ArrayList<>();
stages.forEach(stage -> stageNames.add(stage.name));
EndpointReadyFuture readyFuture = new EndpointReadyFuture(endpoint, stageNames);
newFutures.put(endpoint, readyFuture.future);
stages.forEach(stage -> {
stage.future.whenComplete((__, exception) -> {
if (exception != null) {
readyFuture.failStage(stage.name, exception);
} else {
readyFuture.completeStage(stage.name);
}
});
});
});
this.futures = Collections.unmodifiableMap(newFutures);
}
public Map<Endpoint, CompletableFuture<Void>> futures() {
return futures;
}
}

108
server-common/src/main/java/org/apache/kafka/server/network/KafkaAuthorizerServerInfo.java

@ -0,0 +1,108 @@ @@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.network;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
/**
* Runtime broker configuration metadata provided to authorizers during start up.
*/
public final class KafkaAuthorizerServerInfo implements AuthorizerServerInfo {
private final ClusterResource clusterResource;
private final int brokerId;
private final Collection<Endpoint> endpoints;
private final Endpoint interbrokerEndpoint;
private final Collection<String> earlyStartListeners;
public KafkaAuthorizerServerInfo(
ClusterResource clusterResource,
int brokerId,
Collection<Endpoint> endpoints,
Endpoint interbrokerEndpoint,
Collection<String> earlyStartListeners
) {
this.clusterResource = clusterResource;
this.brokerId = brokerId;
this.endpoints = Collections.unmodifiableCollection(new ArrayList<>(endpoints));
this.interbrokerEndpoint = interbrokerEndpoint;
this.earlyStartListeners = Collections.unmodifiableCollection(new ArrayList<>(earlyStartListeners));
}
@Override
public ClusterResource clusterResource() {
return clusterResource;
}
@Override
public int brokerId() {
return brokerId;
}
@Override
public Collection<Endpoint> endpoints() {
return endpoints;
}
@Override
public Endpoint interBrokerEndpoint() {
return interbrokerEndpoint;
}
@Override
public Collection<String> earlyStartListeners() {
return earlyStartListeners;
}
@Override
public boolean equals(Object o) {
if (o == null || (!(o.getClass().equals(KafkaAuthorizerServerInfo.class)))) return false;
KafkaAuthorizerServerInfo other = (KafkaAuthorizerServerInfo) o;
return clusterResource.equals(other.clusterResource) &&
brokerId == other.brokerId &&
endpoints.equals(other.endpoints) &&
interbrokerEndpoint.equals(other.interbrokerEndpoint) &&
earlyStartListeners.equals(other.earlyStartListeners);
}
@Override
public int hashCode() {
return Objects.hash(clusterResource,
brokerId,
endpoints,
interbrokerEndpoint,
earlyStartListeners);
}
@Override
public String toString() {
return "KafkaAuthorizerServerInfo(" +
"clusterResource=" + clusterResource +
", brokerId=" + brokerId +
", endpoints=" + endpoints +
", earlyStartListeners=" + earlyStartListeners +
")";
}
}

50
server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java

@ -0,0 +1,50 @@ @@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
class FeaturesTest {
@Test
public void testKRaftModeFeatures() {
Features features = new Features(MINIMUM_KRAFT_VERSION,
Collections.singletonMap("foo", (short) 2), 123, true);
assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
features.finalizedFeatures().get(FEATURE_NAME));
assertEquals((short) 2,
features.finalizedFeatures().get("foo"));
assertEquals(2, features.finalizedFeatures().size());
}
@Test
public void testZkModeFeatures() {
Features features = new Features(MINIMUM_KRAFT_VERSION,
Collections.singletonMap("foo", (short) 2), 123, false);
assertNull(features.finalizedFeatures().get(FEATURE_NAME));
assertEquals((short) 2,
features.finalizedFeatures().get("foo"));
assertEquals(1, features.finalizedFeatures().size());
}
}

169
server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java

@ -0,0 +1,169 @@ @@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.network;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
final public class EndpointReadyFuturesTest {
private static final Endpoint EXTERNAL =
new Endpoint("EXTERNAL", SecurityProtocol.SSL, "127.0.0.1", 9092);
private static final Endpoint INTERNAL =
new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "127.0.0.1", 9093);
private static final KafkaAuthorizerServerInfo INFO = new KafkaAuthorizerServerInfo(
new ClusterResource("S6-01LPiQOCBhhFIunQUcQ"),
1,
Arrays.asList(EXTERNAL, INTERNAL),
INTERNAL,
Arrays.asList("INTERNAL"));
static void assertComplete(
EndpointReadyFutures readyFutures,
Endpoint... endpoints
) {
for (Endpoint endpoint : endpoints) {
String name = endpoint.listenerName().get();
CompletableFuture<Void> future = readyFutures.futures().get(endpoint);
assertNotNull(future, "Unable to find future for " + name);
assertTrue(future.isDone(), "Future for " + name + " is not done.");
assertFalse(future.isCompletedExceptionally(),
"Future for " + name + " is completed exceptionally.");
}
}
static void assertIncomplete(
EndpointReadyFutures readyFutures,
Endpoint... endpoints
) {
for (Endpoint endpoint : endpoints) {
CompletableFuture<Void> future = readyFutures.futures().get(endpoint);
assertNotNull(future, "Unable to find future for " + endpoint);
assertFalse(future.isDone(), "Future for " + endpoint + " is done.");
}
}
static void assertException(
EndpointReadyFutures readyFutures,
Throwable throwable,
Endpoint... endpoints
) {
for (Endpoint endpoint : endpoints) {
CompletableFuture<Void> future = readyFutures.futures().get(endpoint);
assertNotNull(future, "Unable to find future for " + endpoint);
assertTrue(future.isCompletedExceptionally(),
"Future for " + endpoint + " is not completed exceptionally.");
Throwable cause = assertThrows(CompletionException.class,
() -> future.getNow(null)).getCause();
assertNotNull(cause, "Unable to find CompletionException cause for " + endpoint);
assertEquals(throwable.getClass(), cause.getClass());
assertEquals(throwable.getMessage(), cause.getMessage());
}
}
@Test
public void testImmediateCompletion() {
EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
build(Optional.empty(), INFO);
assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
readyFutures.futures().keySet());
assertComplete(readyFutures, EXTERNAL, INTERNAL);
}
@Test
public void testAddReadinessFuture() {
CompletableFuture<Void> foo = new CompletableFuture<>();
EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
addReadinessFuture("foo", foo).
build(Optional.empty(), INFO);
assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
readyFutures.futures().keySet());
assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
foo.complete(null);
assertComplete(readyFutures, EXTERNAL, INTERNAL);
}
@Test
public void testAddMultipleReadinessFutures() {
CompletableFuture<Void> foo = new CompletableFuture<>();
CompletableFuture<Void> bar = new CompletableFuture<>();
EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
addReadinessFuture("foo", foo).
addReadinessFuture("bar", bar).
build(Optional.empty(), INFO);
assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
readyFutures.futures().keySet());
assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
foo.complete(null);
assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
bar.complete(null);
assertComplete(readyFutures, EXTERNAL, INTERNAL);
}
@Test
public void testAddReadinessFutures() {
Map<Endpoint, CompletableFuture<Void>> bazFutures = new HashMap<>();
bazFutures.put(EXTERNAL, new CompletableFuture<>());
bazFutures.put(INTERNAL, new CompletableFuture<>());
EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
addReadinessFutures("baz", bazFutures).
build(Optional.empty(), INFO);
assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
readyFutures.futures().keySet());
assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
bazFutures.get(EXTERNAL).complete(null);
assertComplete(readyFutures, EXTERNAL);
assertIncomplete(readyFutures, INTERNAL);
bazFutures.get(INTERNAL).complete(null);
assertComplete(readyFutures, EXTERNAL, INTERNAL);
}
@Test
public void testFailedReadinessFuture() {
CompletableFuture<Void> foo = new CompletableFuture<>();
CompletableFuture<Void> bar = new CompletableFuture<>();
EndpointReadyFutures readyFutures = new EndpointReadyFutures.Builder().
addReadinessFuture("foo", foo).
addReadinessFuture("bar", bar).
build(Optional.empty(), INFO);
assertEquals(new HashSet<>(Arrays.asList(EXTERNAL, INTERNAL)),
readyFutures.futures().keySet());
assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
foo.complete(null);
assertIncomplete(readyFutures, EXTERNAL, INTERNAL);
bar.completeExceptionally(new RuntimeException("Failed."));
assertException(readyFutures, new RuntimeException("Failed."),
EXTERNAL, INTERNAL);
}
}
Loading…
Cancel
Save