Browse Source

KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)

topic counts.

Introduces the use of persistent data structures in the KRaft metadata image to avoid copying the entire TopicsImage upon every change.  Performance that was O(<number of topics in the cluster>) is now O(<number of topics changing>), which has dramatic time and GC improvements for the most common topic-related metadata events.  We abstract away the chosen underlying persistent collection library via ImmutableMap<> and ImmutableSet<> interfaces and static factory methods.

Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>, Purshotam Chauhan <pchauhan@confluent.io>
pull/13577/head
Ron Dagostino 1 year ago committed by GitHub
parent
commit
e27926f92b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      LICENSE-binary
  2. 10
      build.gradle
  3. 1
      checkstyle/import-control-jmh-benchmarks.xml
  4. 176
      checkstyle/import-control-metadata.xml
  5. 82
      checkstyle/import-control-server-common.xml
  6. 133
      checkstyle/import-control.xml
  7. 6
      core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
  8. 2
      gradle/dependencies.gradle
  9. 235
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
  10. 90
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java
  11. 112
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java
  12. 99
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java
  13. 24
      licenses/pcollections-MIT
  14. 40
      metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
  15. 32
      metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
  16. 8
      metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
  17. 13
      metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
  18. 64
      server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java
  19. 60
      server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java
  20. 223
      server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java
  21. 188
      server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java
  22. 146
      server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java
  23. 310
      server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java
  24. 274
      server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java

3
LICENSE-binary

@ -313,7 +313,8 @@ argparse4j-0.7.0, see: licenses/argparse-MIT @@ -313,7 +313,8 @@ argparse4j-0.7.0, see: licenses/argparse-MIT
jopt-simple-5.0.4, see: licenses/jopt-simple-MIT
slf4j-api-1.7.36, see: licenses/slf4j-MIT
slf4j-reload4j-1.7.36, see: licenses/slf4j-MIT
classgraph-4.8.138, see: license/classgraph-MIT
classgraph-4.8.138, see: licenses/classgraph-MIT
pcollections-4.0.1, see: licenses/pcollections-MIT
---------------------------------------
BSD 2-Clause

10
build.gradle

@ -1222,6 +1222,10 @@ project(':metadata') { @@ -1222,6 +1222,10 @@ project(':metadata') {
javadoc {
enabled = false
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-metadata.xml")
}
}
project(':group-coordinator') {
@ -1554,11 +1558,13 @@ project(':server-common') { @@ -1554,11 +1558,13 @@ project(':server-common') {
implementation libs.slf4jApi
implementation libs.metrics
implementation libs.joptSimple
implementation libs.pcollections
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.
testImplementation libs.hamcrest
testRuntimeOnly libs.slf4jlog4j
@ -1605,6 +1611,10 @@ project(':server-common') { @@ -1605,6 +1611,10 @@ project(':server-common') {
clean.doFirst {
delete "$buildDir/kafka/"
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-server-common.xml")
}
}
project(':storage:api') {

1
checkstyle/import-control-jmh-benchmarks.xml

@ -51,6 +51,7 @@ @@ -51,6 +51,7 @@
<allow pkg="org.apache.kafka.storage"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.timeline" />

176
checkstyle/import-control-metadata.xml

@ -0,0 +1,176 @@ @@ -0,0 +1,176 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<import-control pkg="org.apache.kafka">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
<subpackage name="common">
<subpackage name="metadata">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
</subpackage>
</subpackage>
<subpackage name="controller">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.image.writer" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metadata.authorizer" />
<allow pkg="org.apache.kafka.metadata.migration" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
<subpackage name="image">
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.image.writer" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="metadata">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="authorizer">
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>
<subpackage name="fault">
<allow pkg="org.apache.kafka.server.fault" />
</subpackage>
</subpackage>
<subpackage name="metalog">
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
</import-control>

82
checkstyle/import-control-server-common.xml

@ -0,0 +1,82 @@ @@ -0,0 +1,82 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<import-control pkg="org.apache.kafka">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
<subpackage name="queue">
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
<allow pkg="joptsimple" />
<subpackage name="common">
<allow pkg="org.apache.kafka.server.common" />
</subpackage>
<subpackage name="immutable">
<allow pkg="org.apache.kafka.server.util"/>
<!-- only the factory package can use persistent collection library-specific wrapper implementations -->
<!-- the library-specific wrapper implementation for PCollections -->
<allow pkg="org.apache.kafka.server.immutable.pcollections" />
<subpackage name="pcollections">
<allow pkg="org.pcollections" />
</subpackage>
</subpackage>
<subpackage name="metrics">
<allow pkg="com.yammer.metrics" />
</subpackage>
</subpackage>
</import-control>

133
checkstyle/import-control.xml

@ -88,14 +88,6 @@ @@ -88,14 +88,6 @@
<allow pkg="org.apache.kafka.common.record" />
</subpackage>
<subpackage name="metadata">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
</subpackage>
<subpackage name="metrics">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
@ -206,122 +198,6 @@ @@ -206,122 +198,6 @@
</subpackage>
</subpackage>
<subpackage name="controller">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.image.writer" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metadata.authorizer" />
<allow pkg="org.apache.kafka.metadata.migration" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
<subpackage name="image">
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.image.writer" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="metadata">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="authorizer">
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>
<subpackage name="fault">
<allow pkg="org.apache.kafka.server.fault" />
</subpackage>
</subpackage>
<subpackage name="metalog">
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="queue">
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="clients">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
@ -358,19 +234,10 @@ @@ -358,19 +234,10 @@
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
<allow pkg="joptsimple" />
<!-- This is required to make AlterConfigPolicyTest work. -->
<allow pkg="org.apache.kafka.server.policy" />
<subpackage name="common">
<allow pkg="org.apache.kafka.server.common" />
</subpackage>
<subpackage name="metrics">
<allow pkg="com.yammer.metrics" />
</subpackage>
<subpackage name="log">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.api" />

6
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala

@ -177,9 +177,9 @@ class BrokerMetadataPublisherTest { @@ -177,9 +177,9 @@ class BrokerMetadataPublisherTest {
private def topicsImage(
topics: Seq[TopicImage]
): TopicsImage = {
val idsMap = topics.map(t => t.id -> t).toMap
val namesMap = topics.map(t => t.name -> t).toMap
new TopicsImage(idsMap.asJava, namesMap.asJava)
var retval = TopicsImage.EMPTY
topics.foreach { t => retval = retval.including(t) }
retval
}
private def newMockDynamicConfigPublisher(

2
gradle/dependencies.gradle

@ -108,6 +108,7 @@ versions += [ @@ -108,6 +108,7 @@ versions += [
metrics: "2.2.0",
mockito: "4.9.0",
netty: "4.1.86.Final",
pcollections: "4.0.1",
powermock: "2.0.9",
reflections: "0.9.12",
reload4j: "1.2.19",
@ -198,6 +199,7 @@ libs += [ @@ -198,6 +199,7 @@ libs += [
mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito",
nettyHandler: "io.netty:netty-handler:$versions.netty",
nettyTransportNativeEpoll: "io.netty:netty-transport-native-epoll:$versions.netty",
pcollections: "org.pcollections:pcollections:$versions.pcollections",
powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock",
reflections: "org.reflections:reflections:$versions.reflections",

235
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java

@ -0,0 +1,235 @@ @@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.metadata;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
import kafka.server.BrokerTopicStats;
import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager;
import kafka.server.ControllerMutationQuotaManager;
import kafka.server.FetchManager;
import kafka.server.ForwardingManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.RaftSupport;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
import kafka.server.SimpleApiVersionManager;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.server.metadata.MockConfigRepository;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import scala.Option;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class KRaftMetadataRequestBenchmark {
@Param({"500", "1000", "5000"})
private int topicCount;
@Param({"10", "20", "50"})
private int partitionCount;
private RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class);
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
private ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class);
private KRaftMetadataCache metadataCache = MetadataCache.kRaftMetadataCache(brokerId);
private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
private ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
private QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest;
@Setup(Level.Trial)
public void setup() {
initializeMetadataCache();
kafkaApis = createKafkaApis();
allTopicMetadataRequest = buildAllTopicMetadataRequest();
}
private void initializeMetadataCache() {
MetadataDelta buildupMetadataDelta = new MetadataDelta(MetadataImage.EMPTY);
IntStream.range(0, 5).forEach(brokerId -> {
RegisterBrokerRecord.BrokerEndpointCollection endpoints = new RegisterBrokerRecord.BrokerEndpointCollection();
endpoints(brokerId).forEach(endpoint ->
endpoints.add(new RegisterBrokerRecord.BrokerEndpoint().
setHost(endpoint.host()).
setPort(endpoint.port()).
setName(endpoint.listener()).
setSecurityProtocol(endpoint.securityProtocol())));
buildupMetadataDelta.replay(new RegisterBrokerRecord().
setBrokerId(brokerId).
setBrokerEpoch(100L).
setFenced(false).
setRack(null).
setEndPoints(endpoints).
setIncarnationId(Uuid.fromString(Uuid.randomUuid().toString())));
});
IntStream.range(0, topicCount).forEach(topicNum -> {
Uuid topicId = Uuid.randomUuid();
buildupMetadataDelta.replay(new TopicRecord().setName("topic-" + topicNum).setTopicId(topicId));
IntStream.range(0, partitionCount).forEach(partitionId ->
buildupMetadataDelta.replay(new PartitionRecord().
setPartitionId(partitionId).
setTopicId(topicId).
setReplicas(Arrays.asList(0, 1, 3)).
setIsr(Arrays.asList(0, 1, 3)).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(partitionCount % 5).
setLeaderEpoch(0)));
});
metadataCache.setImage(buildupMetadataDelta.apply(MetadataProvenance.EMPTY));
}
private List<UpdateMetadataEndpoint> endpoints(final int brokerId) {
return Collections.singletonList(
new UpdateMetadataEndpoint()
.setHost("host_" + brokerId)
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value()));
}
private KafkaApis createKafkaApis() {
Properties kafkaProps = new Properties();
kafkaProps.put(KafkaConfig$.MODULE$.NodeIdProp(), brokerId + "");
kafkaProps.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
kafkaProps.put(KafkaConfig$.MODULE$.QuorumVotersProp(), "9000@foo:8092");
kafkaProps.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
KafkaConfig config = new KafkaConfig(kafkaProps);
return new KafkaApisBuilder().
setRequestChannel(requestChannel).
setMetadataSupport(new RaftSupport(forwardingManager, metadataCache)).
setReplicaManager(replicaManager).
setGroupCoordinator(groupCoordinator).
setTxnCoordinator(transactionCoordinator).
setAutoTopicCreationManager(autoTopicCreationManager).
setBrokerId(brokerId).
setConfig(config).
setConfigRepository(new MockConfigRepository()).
setMetadataCache(metadataCache).
setMetrics(metrics).
setAuthorizer(Optional.empty()).
setQuotas(quotaManagers).
setFetchManager(fetchManager).
setBrokerTopicStats(brokerTopicStats).
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)).
build();
}
@TearDown(Level.Trial)
public void tearDown() {
kafkaApis.close();
metrics.close();
}
private RequestChannel.Request buildAllTopicMetadataRequest() {
MetadataRequest metadataRequest = MetadataRequest.Builder.allTopics().build();
RequestHeader header = new RequestHeader(metadataRequest.apiKey(), metadataRequest.version(), "", 0);
ByteBuffer bodyBuffer = metadataRequest.serialize();
RequestContext context = new RequestContext(header, "1", null, principal,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, bodyBuffer, requestChannelMetrics, Option.empty());
}
@Benchmark
public void testMetadataRequestForAllTopics() {
kafkaApis.handleTopicMetadataRequest(allTopicMetadataRequest);
}
@Benchmark
public String testRequestToJson() {
return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString();
}
@Benchmark
public void testTopicIdInfo() {
metadataCache.topicIdInfo();
}
}

90
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java

@ -0,0 +1,90 @@ @@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class TopicsImageSingleRecordChangeBenchmark {
@Param({"12500", "25000", "50000", "100000"})
private int totalTopicCount;
@Param({"10"})
private int partitionsPerTopic;
@Param({"3"})
private int replicationFactor;
@Param({"10000"})
private int numReplicasPerBroker;
private TopicsDelta topicsDelta;
@Setup(Level.Trial)
public void setup() {
// build an image containing all the specified topics and partitions
TopicsDelta buildupTopicsDelta = TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
TopicsImage builtupTopicsImage = buildupTopicsDelta.apply();
// build a delta to apply within the benchmark code
// that adds a single topic-partition
topicsDelta = new TopicsDelta(builtupTopicsImage);
Uuid newTopicUuid = Uuid.randomUuid();
TopicRecord newTopicRecord = new TopicRecord().setName("newtopic").setTopicId(newTopicUuid);
topicsDelta.replay(newTopicRecord);
ArrayList<Integer> replicas = TopicsImageSnapshotLoadBenchmark.getReplicas(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker, 0);
ArrayList<Integer> isr = new ArrayList<>(replicas);
PartitionRecord newPartitionRecord = new PartitionRecord().
setPartitionId(0).
setTopicId(newTopicUuid).
setReplicas(replicas).
setIsr(isr).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(0);
topicsDelta.replay(newPartitionRecord);
System.out.print("(Adding a single topic to metadata having " + totalTopicCount + " total topics) ");
}
@Benchmark
public void testTopicsDeltaSingleTopicAdd() {
topicsDelta.apply();
}
}

112
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java

@ -0,0 +1,112 @@ @@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class TopicsImageSnapshotLoadBenchmark {
@Param({"12500", "25000", "50000", "100000"})
private int totalTopicCount;
@Param({"10"})
private int partitionsPerTopic;
@Param({"3"})
private int replicationFactor;
@Param({"10000"})
private int numReplicasPerBroker;
private TopicsDelta topicsDelta;
@Setup(Level.Trial)
public void setup() {
// build a delta to apply within the benchmark code
// that consists of all the topics and partitions that would get loaded in a snapshot
topicsDelta = getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
System.out.print("(Loading a snapshot containing " + totalTopicCount + " total topics) ");
}
static TopicsDelta getInitialTopicsDelta(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker) {
int numBrokers = getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
TopicsDelta buildupTopicsDelta = new TopicsDelta(TopicsImage.EMPTY);
final AtomicInteger currentLeader = new AtomicInteger(0);
IntStream.range(0, totalTopicCount).forEach(topicNumber -> {
Uuid topicId = Uuid.randomUuid();
buildupTopicsDelta.replay(new TopicRecord().setName("topic" + topicNumber).setTopicId(topicId));
IntStream.range(0, partitionsPerTopic).forEach(partitionNumber -> {
ArrayList<Integer> replicas = getReplicas(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker, currentLeader.get());
ArrayList<Integer> isr = new ArrayList<>(replicas);
buildupTopicsDelta.replay(new PartitionRecord().
setPartitionId(partitionNumber).
setTopicId(topicId).
setReplicas(replicas).
setIsr(isr).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(currentLeader.get()));
currentLeader.set((1 + currentLeader.get()) % numBrokers);
});
});
return buildupTopicsDelta;
}
static ArrayList<Integer> getReplicas(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker, int currentLeader) {
ArrayList<Integer> replicas = new ArrayList<>();
int numBrokers = getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
IntStream.range(0, replicationFactor).forEach(replicaNumber ->
replicas.add((replicaNumber + currentLeader) % numBrokers));
return replicas;
}
static int getNumBrokers(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker) {
int numBrokers = totalTopicCount * partitionsPerTopic * replicationFactor / numReplicasPerBroker;
return numBrokers - numBrokers % 3;
}
@Benchmark
public void testTopicsDeltaSnapshotLoad() {
topicsDelta.apply();
}
}

99
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java

@ -0,0 +1,99 @@ @@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class TopicsImageZonalOutageBenchmark {
@Param({"12500", "25000", "50000", "100000"})
private int totalTopicCount;
@Param({"10"})
private int partitionsPerTopic;
@Param({"3"})
private int replicationFactor;
@Param({"10000"})
private int numReplicasPerBroker;
private TopicsDelta topicsDelta;
@Setup(Level.Trial)
public void setup() {
// build an image containing all of the specified topics and partitions
TopicsDelta buildupTopicsDelta = TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
TopicsImage builtupTopicsImage = buildupTopicsDelta.apply();
// build a delta to apply within the benchmark code
// that perturbs all the topic-partitions for broker 0
// (as might happen in a zonal outage, one broker at a time, ultimately across 1/3 of the brokers in the cluster).
// It turns out that
topicsDelta = new TopicsDelta(builtupTopicsImage);
Set<Uuid> perturbedTopics = new HashSet<>();
builtupTopicsImage.topicsById().forEach((topicId, topicImage) ->
topicImage.partitions().forEach((partitionNumber, partitionRegistration) -> {
List<Integer> newIsr = Arrays.stream(partitionRegistration.isr).boxed().filter(n -> n != 0).collect(Collectors.toList());
if (newIsr.size() < replicationFactor) {
perturbedTopics.add(topicId);
topicsDelta.replay(new PartitionRecord().
setPartitionId(partitionNumber).
setTopicId(topicId).
setReplicas(Arrays.stream(partitionRegistration.replicas).boxed().collect(Collectors.toList())).
setIsr(newIsr).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(newIsr.get(0)));
}
})
);
int numBrokers = TopicsImageSnapshotLoadBenchmark.getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
System.out.print("(Perturbing 1 of " + numBrokers + " brokers, or " + perturbedTopics.size() + " topics within metadata having " + totalTopicCount + " total topics) ");
}
@Benchmark
public void testTopicsDeltaZonalOutage() {
topicsDelta.apply();
}
}

24
licenses/pcollections-MIT

@ -0,0 +1,24 @@ @@ -0,0 +1,24 @@
MIT License
Copyright 2008-2011, 2014-2020, 2022 Harold Cooper, gil cattaneo, Gleb Frank,
Günther Grill, Ilya Gorbunov, Jirka Kremser, Jochen Theodorou, Johnny Lim,
Liam Miller, Mark Perry, Matei Dragu, Mike Klein, Oleg Osipenko, Ran Ari-Gur,
Shantanu Kumar, and Valeriy Vyrva.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

40
metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java

@ -24,13 +24,13 @@ import org.apache.kafka.common.metadata.PartitionRecord; @@ -24,13 +24,13 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@ -126,29 +126,27 @@ public final class TopicsDelta { @@ -126,29 +126,27 @@ public final class TopicsDelta {
}
public TopicsImage apply() {
Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size());
Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size());
for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) {
Uuid id = entry.getKey();
TopicImage prevTopicImage = entry.getValue();
TopicDelta delta = changedTopics.get(id);
if (delta == null) {
if (!deletedTopicIds.contains(id)) {
newTopicsById.put(id, prevTopicImage);
newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
}
ImmutableMap<Uuid, TopicImage> newTopicsById = image.topicsById();
ImmutableMap<String, TopicImage> newTopicsByName = image.topicsByName();
// apply all the deletes
for (Uuid topicId: deletedTopicIds) {
// it was deleted, so we have to remove it from the maps
TopicImage originalTopicToBeDeleted = image.topicsById().get(topicId);
if (originalTopicToBeDeleted == null) {
throw new IllegalStateException("Missing topic id " + topicId);
} else {
TopicImage newTopicImage = delta.apply();
newTopicsById.put(id, newTopicImage);
newTopicsByName.put(delta.name(), newTopicImage);
newTopicsById = newTopicsById.removed(topicId);
newTopicsByName = newTopicsByName.removed(originalTopicToBeDeleted.name());
}
}
for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) {
if (!newTopicsById.containsKey(entry.getKey())) {
TopicImage newTopicImage = entry.getValue().apply();
newTopicsById.put(newTopicImage.id(), newTopicImage);
newTopicsByName.put(newTopicImage.name(), newTopicImage);
}
// apply all the updates/additions
for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) {
Uuid topicId = entry.getKey();
TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
// put new information into the maps
String topicName = newTopicToBeAddedOrUpdated.name();
newTopicsById = newTopicsById.updated(topicId, newTopicToBeAddedOrUpdated);
newTopicsByName = newTopicsByName.updated(topicName, newTopicToBeAddedOrUpdated);
}
return new TopicsImage(newTopicsById, newTopicsByName);
}

32
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java

@ -21,41 +21,45 @@ import org.apache.kafka.common.Uuid; @@ -21,41 +21,45 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.util.TranslatedValueMapView;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Represents the topics in the metadata image.
*
* This class is thread-safe.
*/
public final class TopicsImage {
public static final TopicsImage EMPTY =
new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
public static final TopicsImage EMPTY = new TopicsImage(ImmutableMap.empty(), ImmutableMap.empty());
private final ImmutableMap<Uuid, TopicImage> topicsById;
private final ImmutableMap<String, TopicImage> topicsByName;
private final Map<Uuid, TopicImage> topicsById;
private final Map<String, TopicImage> topicsByName;
public TopicsImage(ImmutableMap<Uuid, TopicImage> topicsById,
ImmutableMap<String, TopicImage> topicsByName) {
this.topicsById = topicsById;
this.topicsByName = topicsByName;
}
public TopicsImage(Map<Uuid, TopicImage> topicsById,
Map<String, TopicImage> topicsByName) {
this.topicsById = Collections.unmodifiableMap(topicsById);
this.topicsByName = Collections.unmodifiableMap(topicsByName);
public TopicsImage including(TopicImage topic) {
return new TopicsImage(
this.topicsById.updated(topic.id(), topic),
this.topicsByName.updated(topic.name(), topic));
}
public boolean isEmpty() {
return topicsById.isEmpty() && topicsByName.isEmpty();
}
public Map<Uuid, TopicImage> topicsById() {
public ImmutableMap<Uuid, TopicImage> topicsById() {
return topicsById;
}
public Map<String, TopicImage> topicsByName() {
public ImmutableMap<String, TopicImage> topicsByName() {
return topicsByName;
}
@ -74,8 +78,8 @@ public final class TopicsImage { @@ -74,8 +78,8 @@ public final class TopicsImage {
}
public void write(ImageWriter writer, ImageWriterOptions options) {
for (TopicImage topicImage : topicsById.values()) {
topicImage.write(writer, options);
for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
entry.getValue().write(writer, options);
}
}

8
metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java

@ -99,12 +99,10 @@ public class ControllerMetricsTestUtils { @@ -99,12 +99,10 @@ public class ControllerMetricsTestUtils {
public static TopicsImage fakeTopicsImage(
TopicImage... topics
) {
Map<Uuid, TopicImage> topicsById = new HashMap<>();
Map<String, TopicImage> topicsByName = new HashMap<>();
TopicsImage image = TopicsImage.EMPTY;
for (TopicImage topic : topics) {
topicsById.put(topic.id(), topic);
topicsByName.put(topic.name(), topic);
image = image.including(topic);
}
return new TopicsImage(topicsById, topicsByName);
return image;
}
}

13
metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java

@ -29,6 +29,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState; @@ -29,6 +29,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -74,18 +75,18 @@ public class TopicsImageTest { @@ -74,18 +75,18 @@ public class TopicsImageTest {
return new TopicImage(name, id, partitionMap);
}
private static Map<Uuid, TopicImage> newTopicsByIdMap(Collection<TopicImage> topics) {
Map<Uuid, TopicImage> map = new HashMap<>();
private static ImmutableMap<Uuid, TopicImage> newTopicsByIdMap(Collection<TopicImage> topics) {
ImmutableMap<Uuid, TopicImage> map = TopicsImage.EMPTY.topicsById();
for (TopicImage topic : topics) {
map.put(topic.id(), topic);
map = map.updated(topic.id(), topic);
}
return map;
}
private static Map<String, TopicImage> newTopicsByNameMap(Collection<TopicImage> topics) {
Map<String, TopicImage> map = new HashMap<>();
private static ImmutableMap<String, TopicImage> newTopicsByNameMap(Collection<TopicImage> topics) {
ImmutableMap<String, TopicImage> map = TopicsImage.EMPTY.topicsByName();
for (TopicImage topic : topics) {
map.put(topic.name(), topic);
map = map.updated(topic.name(), topic);
}
return map;
}

64
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java

@ -0,0 +1,64 @@ @@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable;
import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableMap;
import java.util.Map;
/**
* A persistent Hash-based Map wrapper.
* java.util.Map methods that mutate in-place will throw UnsupportedOperationException
*
* @param <K> the key type
* @param <V> the value type
*/
public interface ImmutableMap<K, V> extends Map<K, V> {
/**
* @return a wrapped hash-based persistent map that is empty
* @param <K> the key type
* @param <V> the value type
*/
static <K, V> ImmutableMap<K, V> empty() {
return PCollectionsImmutableMap.empty();
}
/**
* @param key the key
* @param value the value
* @return a wrapped hash-based persistent map that has a single mapping
* @param <K> the key type
* @param <V> the value type
*/
static <K, V> ImmutableMap<K, V> singleton(K key, V value) {
return PCollectionsImmutableMap.singleton(key, value);
}
/**
* @param key the key
* @param value the value
* @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary)
*/
ImmutableMap<K, V> updated(K key, V value);
/**
* @param key the key
* @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary)
*/
ImmutableMap<K, V> removed(K key);
}

60
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java

@ -0,0 +1,60 @@ @@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable;
import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableSet;
import java.util.Set;
/**
* A persistent Hash-based Set wrapper
* java.util.Set methods that mutate in-place will throw UnsupportedOperationException
*
* @param <E> the element type
*/
public interface ImmutableSet<E> extends Set<E> {
/**
* @return a wrapped hash-based persistent set that is empty
* @param <E> the element type
*/
static <E> ImmutableSet<E> empty() {
return PCollectionsImmutableSet.empty();
}
/**
* @param e the element
* @return a wrapped hash-based persistent set that has a single element
* @param <E> the element type
*/
static <E> ImmutableSet<E> singleton(E e) {
return PCollectionsImmutableSet.singleton(e);
}
/**
* @param e the element
* @return a wrapped persistent set that differs from this one in that the given element is added (if necessary)
*/
ImmutableSet<E> added(E e);
/**
* @param e the element
* @return a wrapped persistent set that differs from this one in that the given element is added (if necessary)
*/
ImmutableSet<E> removed(E e);
}

223
server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java

@ -0,0 +1,223 @@ @@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable.pcollections;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.pcollections.HashPMap;
import org.pcollections.HashTreePMap;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
@SuppressWarnings("deprecation")
public class PCollectionsImmutableMap<K, V> implements ImmutableMap<K, V> {
private final HashPMap<K, V> underlying;
/**
* @return a wrapped hash-based persistent map that is empty
* @param <K> the key type
* @param <V> the value type
*/
public static <K, V> PCollectionsImmutableMap<K, V> empty() {
return new PCollectionsImmutableMap<>(HashTreePMap.empty());
}
/**
* @param key the key
* @param value the value
* @return a wrapped hash-based persistent map that has a single mapping
* @param <K> the key type
* @param <V> the value type
*/
public static <K, V> PCollectionsImmutableMap<K, V> singleton(K key, V value) {
return new PCollectionsImmutableMap<>(HashTreePMap.singleton(key, value));
}
public PCollectionsImmutableMap(HashPMap<K, V> map) {
this.underlying = Objects.requireNonNull(map);
}
@Override
public ImmutableMap<K, V> updated(K key, V value) {
return new PCollectionsImmutableMap<>(underlying().plus(key, value));
}
@Override
public ImmutableMap<K, V> removed(K key) {
return new PCollectionsImmutableMap<>(underlying().minus(key));
}
@Override
public int size() {
return underlying().size();
}
@Override
public boolean isEmpty() {
return underlying().isEmpty();
}
@Override
public boolean containsKey(Object key) {
return underlying().containsKey(key);
}
@Override
public boolean containsValue(Object value) {
return underlying().containsValue(value);
}
@Override
public V get(Object key) {
return underlying().get(key);
}
@Override
public V put(K key, V value) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().put(key, value);
}
@Override
public V remove(Object key) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().remove(key);
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
// will throw UnsupportedOperationException; delegate anyway for testability
underlying().putAll(m);
}
@Override
public void clear() {
// will throw UnsupportedOperationException; delegate anyway for testability
underlying().clear();
}
@Override
public Set<K> keySet() {
return underlying().keySet();
}
@Override
public Collection<V> values() {
return underlying().values();
}
@Override
public Set<Entry<K, V>> entrySet() {
return underlying().entrySet();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PCollectionsImmutableMap<?, ?> that = (PCollectionsImmutableMap<?, ?>) o;
return underlying().equals(that.underlying());
}
@Override
public int hashCode() {
return underlying().hashCode();
}
@Override
public V getOrDefault(Object key, V defaultValue) {
return underlying().getOrDefault(key, defaultValue);
}
@Override
public void forEach(BiConsumer<? super K, ? super V> action) {
underlying().forEach(action);
}
@Override
public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
// will throw UnsupportedOperationException; delegate anyway for testability
underlying().replaceAll(function);
}
@Override
public V putIfAbsent(K key, V value) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().putIfAbsent(key, value);
}
@Override
public boolean remove(Object key, Object value) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().remove(key, value);
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().replace(key, oldValue, newValue);
}
@Override
public V replace(K key, V value) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().replace(key, value);
}
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().computeIfAbsent(key, mappingFunction);
}
@Override
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().computeIfPresent(key, remappingFunction);
}
@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().compute(key, remappingFunction);
}
@Override
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().merge(key, value, remappingFunction);
}
@Override
public String toString() {
return "PCollectionsImmutableMap{" +
"underlying=" + underlying() +
'}';
}
// package-private for testing
HashPMap<K, V> underlying() {
return underlying;
}
}

188
server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java

@ -0,0 +1,188 @@ @@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable.pcollections;
import org.apache.kafka.server.immutable.ImmutableSet;
import org.pcollections.HashTreePSet;
import org.pcollections.MapPSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@SuppressWarnings("deprecation")
public class PCollectionsImmutableSet<E> implements ImmutableSet<E> {
private final MapPSet<E> underlying;
/**
* @return a wrapped hash-based persistent set that is empty
* @param <E> the element type
*/
public static <E> PCollectionsImmutableSet<E> empty() {
return new PCollectionsImmutableSet<>(HashTreePSet.empty());
}
/**
* @param e the element
* @return a wrapped hash-based persistent set that has a single element
* @param <E> the element type
*/
public static <E> PCollectionsImmutableSet<E> singleton(E e) {
return new PCollectionsImmutableSet<>(HashTreePSet.singleton(e));
}
public PCollectionsImmutableSet(MapPSet<E> set) {
this.underlying = Objects.requireNonNull(set);
}
@Override
public ImmutableSet<E> added(E e) {
return new PCollectionsImmutableSet<>(underlying().plus(e));
}
@Override
public ImmutableSet<E> removed(E e) {
return new PCollectionsImmutableSet<>(underlying().minus(e));
}
@Override
public int size() {
return underlying().size();
}
@Override
public boolean isEmpty() {
return underlying().isEmpty();
}
@Override
public boolean contains(Object o) {
return underlying().contains(o);
}
@Override
public Iterator<E> iterator() {
return underlying.iterator();
}
@Override
public void forEach(Consumer<? super E> action) {
underlying().forEach(action);
}
@Override
public Object[] toArray() {
return underlying().toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return underlying().toArray(a);
}
@Override
public boolean add(E e) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().add(e);
}
@Override
public boolean remove(Object o) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().remove(o);
}
@Override
public boolean containsAll(Collection<?> c) {
return underlying.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().addAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().retainAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().removeAll(c);
}
@Override
public boolean removeIf(Predicate<? super E> filter) {
// will throw UnsupportedOperationException; delegate anyway for testability
return underlying().removeIf(filter);
}
@Override
public void clear() {
// will throw UnsupportedOperationException; delegate anyway for testability
underlying().clear();
}
@Override
public Spliterator<E> spliterator() {
return underlying().spliterator();
}
@Override
public Stream<E> stream() {
return underlying().stream();
}
@Override
public Stream<E> parallelStream() {
return underlying().parallelStream();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PCollectionsImmutableSet<?> that = (PCollectionsImmutableSet<?>) o;
return Objects.equals(underlying(), that.underlying());
}
@Override
public int hashCode() {
return underlying().hashCode();
}
@Override
public String toString() {
return "PCollectionsImmutableSet{" +
"underlying=" + underlying() +
'}';
}
// package-private for testing
MapPSet<E> underlying() {
return this.underlying;
}
}

146
server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java

@ -0,0 +1,146 @@ @@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable;
import org.mockito.Mockito;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
/**
* Facilitate testing of wrapper class delegation.
*
* We require the following things to test delegation:
*
* 1. A mock object to which the wrapper is expected to delegate method invocations
* 2. A way to define how the mock is expected to behave when its method is invoked
* 3. A way to define how to invoke the method on the wrapper
* 4. A way to test that the method on the mock is invoked correctly when the wrapper method is invoked
* 5. A way to test that any return value from the wrapper method is correct
* @param <D> delegate type
* @param <W> wrapper type
* @param <T> delegating method return type, if any
*/
public abstract class DelegationChecker<D, W, T> {
private final D mock;
private final W wrapper;
private Consumer<D> mockConsumer;
private Function<D, T> mockConfigurationFunction;
private T mockFunctionReturnValue;
private Consumer<W> wrapperConsumer;
private Function<W, T> wrapperFunctionApplier;
private Function<T, ?> mockFunctionReturnValueTransformation;
private boolean expectWrapperToWrapMockFunctionReturnValue;
private boolean persistentCollectionMethodInvokedCorrectly = false;
/**
* @param mock mock for the underlying delegate
* @param wrapperCreator how to create a wrapper for the mock
*/
protected DelegationChecker(D mock, Function<D, W> wrapperCreator) {
this.mock = Objects.requireNonNull(mock);
this.wrapper = Objects.requireNonNull(wrapperCreator).apply(mock);
}
/**
* @param wrapper the wrapper
* @return the underlying delegate for the given wrapper
*/
public abstract D unwrap(W wrapper);
public DelegationChecker<D, W, T> defineMockConfigurationForVoidMethodInvocation(Consumer<D> mockConsumer) {
this.mockConsumer = Objects.requireNonNull(mockConsumer);
return this;
}
public DelegationChecker<D, W, T> defineMockConfigurationForFunctionInvocation(Function<D, T> mockConfigurationFunction, T mockFunctionReturnValue) {
this.mockConfigurationFunction = Objects.requireNonNull(mockConfigurationFunction);
this.mockFunctionReturnValue = mockFunctionReturnValue;
return this;
}
public DelegationChecker<D, W, T> defineWrapperVoidMethodInvocation(Consumer<W> wrapperConsumer) {
this.wrapperConsumer = Objects.requireNonNull(wrapperConsumer);
return this;
}
public <R> DelegationChecker<D, W, T> defineWrapperFunctionInvocationAndMockReturnValueTransformation(
Function<W, T> wrapperFunctionApplier,
Function<T, R> expectedFunctionReturnValueTransformation) {
this.wrapperFunctionApplier = Objects.requireNonNull(wrapperFunctionApplier);
this.mockFunctionReturnValueTransformation = Objects.requireNonNull(expectedFunctionReturnValueTransformation);
return this;
}
public DelegationChecker<D, W, T> expectWrapperToWrapMockFunctionReturnValue() {
this.expectWrapperToWrapMockFunctionReturnValue = true;
return this;
}
public void doVoidMethodDelegationCheck() {
if (mockConsumer == null || wrapperConsumer == null ||
mockConfigurationFunction != null || wrapperFunctionApplier != null ||
mockFunctionReturnValue != null || mockFunctionReturnValueTransformation != null) {
throwExceptionForIllegalTestSetup();
}
// configure the mock to behave as desired
mockConsumer.accept(Mockito.doAnswer(invocation -> {
persistentCollectionMethodInvokedCorrectly = true;
return null;
}).when(mock));
// invoke the wrapper, which should invoke the mock as desired
wrapperConsumer.accept(wrapper);
// assert that the expected delegation to the mock actually occurred
assertTrue(persistentCollectionMethodInvokedCorrectly);
}
@SuppressWarnings("unchecked")
public void doFunctionDelegationCheck() {
if (mockConfigurationFunction == null || wrapperFunctionApplier == null ||
mockFunctionReturnValueTransformation == null ||
mockConsumer != null || wrapperConsumer != null) {
throwExceptionForIllegalTestSetup();
}
// configure the mock to behave as desired
when(mockConfigurationFunction.apply(mock)).thenAnswer(invocation -> {
persistentCollectionMethodInvokedCorrectly = true;
return mockFunctionReturnValue;
});
// invoke the wrapper, which should invoke the mock as desired
T wrapperReturnValue = wrapperFunctionApplier.apply(wrapper);
// assert that the expected delegation to the mock actually occurred, including any return value transformation
assertTrue(persistentCollectionMethodInvokedCorrectly);
Object transformedMockFunctionReturnValue = mockFunctionReturnValueTransformation.apply(mockFunctionReturnValue);
if (this.expectWrapperToWrapMockFunctionReturnValue) {
assertEquals(transformedMockFunctionReturnValue, unwrap((W) wrapperReturnValue));
} else {
assertEquals(transformedMockFunctionReturnValue, wrapperReturnValue);
}
}
private static void throwExceptionForIllegalTestSetup() {
throw new IllegalStateException(
"test setup error: must define both mock and wrapper consumers or both mock and wrapper functions");
}
}

310
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java

@ -0,0 +1,310 @@ @@ -0,0 +1,310 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable.pcollections;
import org.apache.kafka.server.immutable.DelegationChecker;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.pcollections.HashPMap;
import org.pcollections.HashTreePMap;
import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import static java.util.function.Function.identity;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@SuppressWarnings({"unchecked", "deprecation"})
public class PCollectionsImmutableMapTest {
private static final HashPMap<Object, Object> SINGLETON_MAP = HashTreePMap.singleton(new Object(), new Object());
private static final class PCollectionsHashMapWrapperDelegationChecker<R> extends DelegationChecker<HashPMap<Object, Object>, PCollectionsImmutableMap<Object, Object>, R> {
public PCollectionsHashMapWrapperDelegationChecker() {
super(mock(HashPMap.class), PCollectionsImmutableMap::new);
}
public HashPMap<Object, Object> unwrap(PCollectionsImmutableMap<Object, Object> wrapper) {
return wrapper.underlying();
}
}
@Test
public void testEmptyMap() {
Assertions.assertEquals(HashTreePMap.empty(), ((PCollectionsImmutableMap<?, ?>) ImmutableMap.empty()).underlying());
}
@Test
public void testSingletonMap() {
Assertions.assertEquals(HashTreePMap.singleton(1, 2), ((PCollectionsImmutableMap<?, ?>) ImmutableMap.singleton(1, 2)).underlying());
}
@Test
public void testUnderlying() {
assertSame(SINGLETON_MAP, new PCollectionsImmutableMap<>(SINGLETON_MAP).underlying());
}
@Test
public void testDelegationOfAfterAdding() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this), eq(this)), SINGLETON_MAP)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.updated(this, this), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfAfterRemoving() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_MAP)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {1, 2})
public void testDelegationOfSize(int mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(HashPMap::size, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::size, identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(HashPMap::isEmpty, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::isEmpty, identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfContainsKey(boolean mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.containsKey(eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsKey(this), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfContainsValue(boolean mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.containsValue(eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsValue(this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfGet() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.get(eq(this)), new Object())
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.get(this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfPut() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.put(eq(this), eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.put(this, this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfRemoveByKey() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfPutAll() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.putAll(eq(Collections.emptyMap())))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.putAll(Collections.emptyMap()))
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfClear() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(HashPMap::clear)
.defineWrapperVoidMethodInvocation(PCollectionsImmutableMap::clear)
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfKeySet() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(HashPMap::keySet, Collections.emptySet())
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::keySet, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfValues() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(HashPMap::values, Collections.emptySet())
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::values, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfEntrySet() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(HashPMap::entrySet, Collections.emptySet())
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::entrySet, identity())
.doFunctionDelegationCheck();
}
@Test
public void testEquals() {
final HashPMap<Object, Object> mock = mock(HashPMap.class);
assertEquals(new PCollectionsImmutableMap<>(mock), new PCollectionsImmutableMap<>(mock));
final HashPMap<Object, Object> someOtherMock = mock(HashPMap.class);
assertNotEquals(new PCollectionsImmutableMap<>(mock), new PCollectionsImmutableMap<>(someOtherMock));
}
@Test
public void testHashCode() {
final HashPMap<Object, Object> mock = mock(HashPMap.class);
assertEquals(mock.hashCode(), new PCollectionsImmutableMap<>(mock).hashCode());
final HashPMap<Object, Object> someOtherMock = mock(HashPMap.class);
assertNotEquals(mock.hashCode(), new PCollectionsImmutableMap<>(someOtherMock).hashCode());
}
@Test
public void testDelegationOfGetOrDefault() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.getOrDefault(eq(this), eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.getOrDefault(this, this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfForEach() {
final BiConsumer<Object, Object> mockBiConsumer = mock(BiConsumer.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockBiConsumer)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockBiConsumer))
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfReplaceAll() {
final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction))
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfPutIfAbsent() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfReplaceWhenMappedToAnyValue() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfComputeIfAbsent() {
final Function<Object, Object> mockFunction = mock(Function.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfAbsent(eq(this), eq(mockFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfAbsent(this, mockFunction))
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfComputeIfPresent() {
final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfPresent(eq(this), eq(mockBiFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfPresent(this, mockBiFunction))
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfCompute() {
final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.compute(eq(this), eq(mockBiFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.compute(this, mockBiFunction))
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfMerge() {
final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.merge(eq(this), eq(this), eq(mockBiFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.merge(this, this, mockBiFunction))
.doVoidMethodDelegationCheck();
}
@ParameterizedTest
@ValueSource(strings = {"a", "b"})
public void testDelegationOfToString(String mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(HashPMap::toString, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::toString,
text -> "PCollectionsImmutableMap{underlying=" + text + "}")
.doFunctionDelegationCheck();
}
}

274
server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java

@ -0,0 +1,274 @@ @@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable.pcollections;
import org.apache.kafka.server.immutable.DelegationChecker;
import org.apache.kafka.server.immutable.ImmutableSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.pcollections.HashTreePSet;
import org.pcollections.MapPSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static java.util.function.Function.identity;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@SuppressWarnings({"unchecked", "deprecation"})
public class PCollectionsImmutableSetTest {
private static final MapPSet<Object> SINGLETON_SET = HashTreePSet.singleton(new Object());
private static final class PCollectionsHashSetWrapperDelegationChecker<R> extends DelegationChecker<MapPSet<Object>, PCollectionsImmutableSet<Object>, R> {
public PCollectionsHashSetWrapperDelegationChecker() {
super(mock(MapPSet.class), PCollectionsImmutableSet::new);
}
public MapPSet<Object> unwrap(PCollectionsImmutableSet<Object> wrapper) {
return wrapper.underlying();
}
}
@Test
public void testEmptySet() {
Assertions.assertEquals(HashTreePSet.empty(), ((PCollectionsImmutableSet<?>) ImmutableSet.empty()).underlying());
}
@Test
public void testSingletonSet() {
Assertions.assertEquals(HashTreePSet.singleton(1), ((PCollectionsImmutableSet<?>) ImmutableSet.singleton(1)).underlying());
}
@Test
public void testUnderlying() {
assertSame(SINGLETON_SET, new PCollectionsImmutableSet<>(SINGLETON_SET).underlying());
}
@Test
public void testDelegationOfAfterAdding() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this)), SINGLETON_SET)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.added(this), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfAfterRemoving() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_SET)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {1, 2})
public void testDelegationOfSize(int mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(MapPSet::size, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::size, identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(MapPSet::isEmpty, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::isEmpty, identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfContains(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.contains(eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.contains(this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfIterator() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(MapPSet::iterator, mock(Iterator.class))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::iterator, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfForEach() {
final Consumer<Object> mockConsumer = mock(Consumer.class);
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockConsumer)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockConsumer))
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfToArray() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(MapPSet::toArray, new Object[0])
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::toArray, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfToArrayIntoGivenDestination() {
Object[] destinationArray = new Object[0];
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.toArray(eq(destinationArray)), new Object[0])
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.toArray(destinationArray), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfAdd(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.add(eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.add(this), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRemove(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfContainsAll(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.containsAll(eq(Collections.emptyList())), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsAll(Collections.emptyList()), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfAddAll(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.addAll(eq(Collections.emptyList())), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.addAll(Collections.emptyList()), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRetainAll(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.retainAll(eq(Collections.emptyList())), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.retainAll(Collections.emptyList()), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRemoveAll(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.removeAll(eq(Collections.emptyList())), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeAll(Collections.emptyList()), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRemoveIf(boolean mockFunctionReturnValue) {
final Predicate<Object> mockPredicate = mock(Predicate.class);
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.removeIf(eq(mockPredicate)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeIf(mockPredicate), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfClear() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(MapPSet::clear)
.defineWrapperVoidMethodInvocation(PCollectionsImmutableSet::clear)
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfSpliterator() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(MapPSet::spliterator, mock(Spliterator.class))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::spliterator, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfStream() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(MapPSet::stream, mock(Stream.class))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::stream, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfParallelStream() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(MapPSet::parallelStream, mock(Stream.class))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::parallelStream, identity())
.doFunctionDelegationCheck();
}
@Test
public void testEquals() {
final MapPSet<Object> mock = mock(MapPSet.class);
assertEquals(new PCollectionsImmutableSet<>(mock), new PCollectionsImmutableSet<>(mock));
final MapPSet<Object> someOtherMock = mock(MapPSet.class);
assertNotEquals(new PCollectionsImmutableSet<>(mock), new PCollectionsImmutableSet<>(someOtherMock));
}
@Test
public void testHashCode() {
final MapPSet<Object> mock = mock(MapPSet.class);
assertEquals(mock.hashCode(), new PCollectionsImmutableSet<>(mock).hashCode());
final MapPSet<Object> someOtherMock = mock(MapPSet.class);
assertNotEquals(mock.hashCode(), new PCollectionsImmutableSet<>(someOtherMock).hashCode());
}
@ParameterizedTest
@ValueSource(strings = {"a", "b"})
public void testDelegationOfToString(String mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(MapPSet::toString, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::toString,
text -> "PCollectionsImmutableSet{underlying=" + text + "}")
.doFunctionDelegationCheck();
}
}
Loading…
Cancel
Save