Browse Source

KAFKA-15581: Introduce ELR (#14312)

This patch introduces preliminary changes for Eligible Leader Replicas (KIP-966)

* New MetadataVersion 16 (3.7-IV1)
* New record versions for PartitionRecord and PartitionChangeRecord
* New tagged fields on PartitionRecord and PartitionChangeRecord
* New static config "eligible.leader.replicas.enable" to gate the whole feature

Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
pull/14502/merge
Calvin Liu 11 months ago committed by GitHub
parent
commit
af747fbfed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      checkstyle/suppressions.xml
  2. 4
      core/src/main/scala/kafka/server/ControllerServer.scala
  3. 6
      core/src/main/scala/kafka/server/KafkaConfig.scala
  4. 2
      core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
  5. 2
      core/src/test/java/kafka/test/annotation/ClusterTest.java
  6. 20
      metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
  7. 89
      metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
  8. 30
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  9. 96
      metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
  10. 2
      metadata/src/main/java/org/apache/kafka/image/TopicImage.java
  11. 66
      metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
  12. 12
      metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
  13. 12
      metadata/src/main/resources/common/metadata/PartitionRecord.json
  14. 3
      metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
  15. 377
      metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
  16. 6
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
  17. 122
      metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
  18. 12
      metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
  19. 77
      metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
  20. 25
      server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
  21. 18
      server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
  22. 6
      tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

2
checkstyle/suppressions.xml

@ -307,6 +307,8 @@ @@ -307,6 +307,8 @@
files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
files="(QuorumController).java"/>
<suppress checks="NPathComplexity"
files="(PartitionRegistration|PartitionChangeBuilder).java"/>
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager).java"/>
<suppress checks="NPathComplexity"

4
core/src/main/scala/kafka/server/ControllerServer.scala

@ -245,6 +245,7 @@ class ControllerServer( @@ -245,6 +245,7 @@ class ControllerServer(
setQuorumFeatures(quorumFeatures).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
setDefaultMinIsr(config.minInSyncReplicas.intValue()).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
@ -262,7 +263,8 @@ class ControllerServer( @@ -262,7 +263,8 @@ class ControllerServer(
setDelegationTokenSecretKey(delegationTokenKeyString).
setDelegationTokenMaxLifeMs(config.delegationTokenMaxLifeMs).
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs)
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
setEligibleLeaderReplicasEnabled(config.elrEnabled)
}
controller = controllerBuilder.build()

6
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -399,6 +399,9 @@ object KafkaConfig { @@ -399,6 +399,9 @@ object KafkaConfig {
/** ZK to KRaft Migration configs */
val MigrationEnabledProp = "zookeeper.metadata.migration.enable"
/** Enable eligible leader replicas configs */
val ElrEnabledProp = "eligible.leader.replicas.enable"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
val EarlyStartListenersProp = "early.start.listeners"
@ -1236,6 +1239,7 @@ object KafkaConfig { @@ -1236,6 +1239,7 @@ object KafkaConfig {
.define(MetadataMaxIdleIntervalMsProp, INT, Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW, MetadataMaxIdleIntervalMsDoc)
.defineInternal(ServerMaxStartupTimeMsProp, LONG, Defaults.ServerMaxStartupTimeMs, atLeast(0), MEDIUM, ServerMaxStartupTimeMsDoc)
.define(MigrationEnabledProp, BOOLEAN, false, HIGH, "Enable ZK to KRaft migration")
.define(ElrEnabledProp, BOOLEAN, false, HIGH, "Enable the Eligible leader replicas")
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc)
@ -1736,6 +1740,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami @@ -1736,6 +1740,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp)
val elrEnabled: Boolean = getBoolean(KafkaConfig.ElrEnabledProp)
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
case "broker" => BrokerRole

2
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java

@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest { @@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testDefaults(ClusterConfig config) {
Assertions.assertEquals(MetadataVersion.IBP_3_7_IV0, config.metadataVersion());
Assertions.assertEquals(MetadataVersion.IBP_3_7_IV1, config.metadataVersion());
}
}

2
core/src/test/java/kafka/test/annotation/ClusterTest.java

@ -41,6 +41,6 @@ public @interface ClusterTest { @@ -41,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV0;
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV1;
ClusterConfigProperty[] serverProperties() default {};
}

20
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java

@ -44,6 +44,7 @@ import java.util.Iterator; @@ -44,6 +44,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
@ -434,6 +435,25 @@ public class ConfigurationControlManager { @@ -434,6 +435,25 @@ public class ConfigurationControlManager {
}
}
/**
* Get the config value for the give topic and give config key.
* If the config value is not found, return null.
*
* @param topicName The topic name for the config.
* @param configKey The key for the config.
*/
String getTopicConfig(String topicName, String configKey) throws NoSuchElementException {
Map<String, String> map = configData.get(new ConfigResource(Type.TOPIC, topicName));
if (map == null || !map.containsKey(configKey)) {
Map<String, ConfigEntry> effectiveConfigMap = computeEffectiveTopicConfigs(Collections.emptyMap());
if (!effectiveConfigMap.containsKey(configKey)) {
return null;
}
return effectiveConfigMap.get(configKey).value();
}
return map.get(configKey);
}
public Map<ConfigResource, ResultOrError<Map<String, String>>> describeConfigs(
long lastCommittedOffset, Map<ConfigResource, Collection<String>> resources) {
Map<ConfigResource, ResultOrError<Map<String, String>>> results = new HashMap<>();

89
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java

@ -17,9 +17,12 @@ @@ -17,9 +17,12 @@
package org.apache.kafka.controller;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
@ -44,6 +47,8 @@ public class PartitionChangeBuilder { @@ -44,6 +47,8 @@ public class PartitionChangeBuilder {
public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
if (record.isr() != null) return false;
if (record.eligibleLeaderReplicas() != null) return false;
if (record.lastKnownELR() != null) return false;
if (record.leader() != NO_LEADER_CHANGE) return false;
if (record.replicas() != null) return false;
if (record.removingReplicas() != null) return false;
@ -79,9 +84,14 @@ public class PartitionChangeBuilder { @@ -79,9 +84,14 @@ public class PartitionChangeBuilder {
private List<Integer> targetReplicas;
private List<Integer> targetRemoving;
private List<Integer> targetAdding;
private List<Integer> targetElr;
private List<Integer> targetLastKnownElr;
private List<Integer> uncleanShutdownReplicas;
private Election election = Election.ONLINE;
private LeaderRecoveryState targetLeaderRecoveryState;
private boolean zkMigrationEnabled;
private boolean eligibleLeaderReplicasEnabled;
private int minISR;
public PartitionChangeBuilder(
@ -89,7 +99,8 @@ public class PartitionChangeBuilder { @@ -89,7 +99,8 @@ public class PartitionChangeBuilder {
Uuid topicId,
int partitionId,
IntPredicate isAcceptableLeader,
MetadataVersion metadataVersion
MetadataVersion metadataVersion,
int minISR
) {
this.partition = partition;
this.topicId = topicId;
@ -97,11 +108,15 @@ public class PartitionChangeBuilder { @@ -97,11 +108,15 @@ public class PartitionChangeBuilder {
this.isAcceptableLeader = isAcceptableLeader;
this.metadataVersion = metadataVersion;
this.zkMigrationEnabled = false;
this.eligibleLeaderReplicasEnabled = false;
this.minISR = minISR;
this.targetIsr = Replicas.toList(partition.isr);
this.targetReplicas = Replicas.toList(partition.replicas);
this.targetRemoving = Replicas.toList(partition.removingReplicas);
this.targetAdding = Replicas.toList(partition.addingReplicas);
this.targetElr = Replicas.toList(partition.elr);
this.targetLastKnownElr = Replicas.toList(partition.lastKnownElr);
this.targetLeaderRecoveryState = partition.leaderRecoveryState;
}
@ -124,6 +139,11 @@ public class PartitionChangeBuilder { @@ -124,6 +139,11 @@ public class PartitionChangeBuilder {
return this;
}
public PartitionChangeBuilder setUncleanShutdownReplicas(List<Integer> uncleanShutdownReplicas) {
this.uncleanShutdownReplicas = uncleanShutdownReplicas;
return this;
}
public PartitionChangeBuilder setElection(Election election) {
this.election = election;
return this;
@ -149,6 +169,11 @@ public class PartitionChangeBuilder { @@ -149,6 +169,11 @@ public class PartitionChangeBuilder {
return this;
}
public PartitionChangeBuilder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}
// VisibleForTesting
static class ElectionResult {
final int node;
@ -328,15 +353,20 @@ public class PartitionChangeBuilder { @@ -328,15 +353,20 @@ public class PartitionChangeBuilder {
completeReassignmentIfNeeded();
maybePopulateTargetElr();
tryElection(record);
triggerLeaderEpochBumpIfNeeded(record);
maybeUpdateRecordElr(record);
if (record.isr() == null && !targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) {
// Set the new ISR if it is different from the current ISR and unclean leader election didn't already set it.
record.setIsr(targetIsr);
}
setAssignmentChanges(record);
if (targetLeaderRecoveryState != partition.leaderRecoveryState) {
@ -346,7 +376,7 @@ public class PartitionChangeBuilder { @@ -346,7 +376,7 @@ public class PartitionChangeBuilder {
if (changeRecordIsNoOp(record)) {
return Optional.empty();
} else {
return Optional.of(new ApiMessageAndVersion(record, (short) 0));
return Optional.of(new ApiMessageAndVersion(record, metadataVersion.partitionChangeRecordVersion()));
}
}
@ -362,6 +392,58 @@ public class PartitionChangeBuilder { @@ -362,6 +392,58 @@ public class PartitionChangeBuilder {
}
}
private void maybeUpdateRecordElr(PartitionChangeRecord record) {
// During the leader election, it can set the record isr if an unclean leader election happens.
boolean isCleanLeaderElection = record.isr() == null;
// Clean the ELR related fields if it is an unclean election or ELR is disabled.
if (!isCleanLeaderElection || !eligibleLeaderReplicasEnabled) {
targetElr = Collections.emptyList();
targetLastKnownElr = Collections.emptyList();
}
if (!targetElr.equals(Replicas.toList(partition.elr))) {
record.setEligibleLeaderReplicas(targetElr);
}
if (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
record.setLastKnownELR(targetLastKnownElr);
}
}
private void maybePopulateTargetElr() {
if (!eligibleLeaderReplicasEnabled) return;
// If the ISR is larger or equal to the min ISR, clear the ELR and lastKnownELR.
if (targetIsr.size() >= minISR) {
targetElr = Collections.emptyList();
targetLastKnownElr = Collections.emptyList();
return;
}
Set<Integer> targetIsrSet = new HashSet<>(targetIsr);
// Tracking the ELR. The new elr is expected to
// 1. Include the current ISR
// 2. Exclude the duplicate replicas between elr and target ISR.
// 3. Exclude unclean shutdown replicas.
// To do that, we first union the current ISR and current elr, then filter out the target ISR and unclean shutdown
// Replicas.
Set<Integer> candidateSet = new HashSet<>(targetElr);
Arrays.stream(partition.isr).forEach(ii -> candidateSet.add(ii));
targetElr = candidateSet.stream()
.filter(replica -> !targetIsrSet.contains(replica))
.filter(replica -> uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica))
.collect(Collectors.toList());
// Calculate the new last known ELR. Includes any ISR members since the ISR size drops below min ISR.
// In order to reduce the metadata usage, the last known ELR excludes the members in ELR and current ISR.
targetLastKnownElr.forEach(ii -> candidateSet.add(ii));
targetLastKnownElr = candidateSet.stream()
.filter(replica -> !targetIsrSet.contains(replica))
.filter(replica -> !targetElr.contains(replica))
.collect(Collectors.toList());
}
@Override
public String toString() {
return "PartitionChangeBuilder(" +
@ -373,6 +455,9 @@ public class PartitionChangeBuilder { @@ -373,6 +455,9 @@ public class PartitionChangeBuilder {
", targetReplicas=" + targetReplicas +
", targetRemoving=" + targetRemoving +
", targetAdding=" + targetAdding +
", targetElr=" + targetElr +
", targetLastKnownElr=" + targetLastKnownElr +
", uncleanShutdownReplicas=" + uncleanShutdownReplicas +
", election=" + election +
", targetLeaderRecoveryState=" + targetLeaderRecoveryState +
')';

30
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -203,6 +203,7 @@ public final class QuorumController implements Controller { @@ -203,6 +203,7 @@ public final class QuorumController implements Controller {
private QuorumFeatures quorumFeatures = null;
private short defaultReplicationFactor = 3;
private int defaultNumPartitions = 1;
private int defaultMinIsr = 1;
private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
@ -215,6 +216,7 @@ public final class QuorumController implements Controller { @@ -215,6 +216,7 @@ public final class QuorumController implements Controller {
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
private boolean zkMigrationEnabled = false;
private boolean eligibleLeaderReplicasEnabled = false;
private DelegationTokenCache tokenCache;
private String tokenSecretKeyString;
private long delegationTokenMaxLifeMs;
@ -280,6 +282,11 @@ public final class QuorumController implements Controller { @@ -280,6 +282,11 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setDefaultMinIsr(int defaultMinIsr) {
this.defaultMinIsr = defaultMinIsr;
return this;
}
public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
this.replicaPlacer = replicaPlacer;
return this;
@ -340,6 +347,11 @@ public final class QuorumController implements Controller { @@ -340,6 +347,11 @@ public final class QuorumController implements Controller {
return this;
}
public Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}
public Builder setDelegationTokenCache(DelegationTokenCache tokenCache) {
this.tokenCache = tokenCache;
return this;
@ -405,6 +417,7 @@ public final class QuorumController implements Controller { @@ -405,6 +417,7 @@ public final class QuorumController implements Controller {
quorumFeatures,
defaultReplicationFactor,
defaultNumPartitions,
defaultMinIsr,
replicaPlacer,
leaderImbalanceCheckIntervalNs,
maxIdleIntervalNs,
@ -421,7 +434,8 @@ public final class QuorumController implements Controller { @@ -421,7 +434,8 @@ public final class QuorumController implements Controller {
tokenSecretKeyString,
delegationTokenMaxLifeMs,
delegationTokenExpiryTimeMs,
delegationTokenExpiryCheckIntervalMs
delegationTokenExpiryCheckIntervalMs,
eligibleLeaderReplicasEnabled
);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
@ -1746,6 +1760,8 @@ public final class QuorumController implements Controller { @@ -1746,6 +1760,8 @@ public final class QuorumController implements Controller {
private final boolean zkMigrationEnabled;
private final boolean eligibleLeaderReplicasEnabled;
/**
* The maximum number of records per batch to allow.
*/
@ -1769,6 +1785,7 @@ public final class QuorumController implements Controller { @@ -1769,6 +1785,7 @@ public final class QuorumController implements Controller {
QuorumFeatures quorumFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
int defaultMinIsr,
ReplicaPlacer replicaPlacer,
OptionalLong leaderImbalanceCheckIntervalNs,
OptionalLong maxIdleIntervalNs,
@ -1785,7 +1802,8 @@ public final class QuorumController implements Controller { @@ -1785,7 +1802,8 @@ public final class QuorumController implements Controller {
String tokenSecretKeyString,
long delegationTokenMaxLifeMs,
long delegationTokenExpiryTimeMs,
long delegationTokenExpiryCheckIntervalMs
long delegationTokenExpiryCheckIntervalMs,
boolean eligibleLeaderReplicasEnabled
) {
this.nonFatalFaultHandler = nonFatalFaultHandler;
this.fatalFaultHandler = fatalFaultHandler;
@ -1854,6 +1872,8 @@ public final class QuorumController implements Controller { @@ -1854,6 +1872,8 @@ public final class QuorumController implements Controller {
setLogContext(logContext).
setDefaultReplicationFactor(defaultReplicationFactor).
setDefaultNumPartitions(defaultNumPartitions).
setDefaultMinIsr(defaultMinIsr).
setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
@ -1887,9 +1907,11 @@ public final class QuorumController implements Controller { @@ -1887,9 +1907,11 @@ public final class QuorumController implements Controller {
this.zkRecordConsumer = new MigrationRecordConsumer();
this.zkMigrationEnabled = zkMigrationEnabled;
this.recordRedactor = new RecordRedactor(configSchema);
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
log.info("Creating new QuorumController with clusterId {}.{}",
clusterId, zkMigrationEnabled ? " ZK migration mode is enabled." : "");
log.info("Creating new QuorumController with clusterId {}.{}{}",
clusterId, zkMigrationEnabled ? " ZK migration mode is enabled." : "",
eligibleLeaderReplicasEnabled ? " Eligible leader replicas enabled." : "");
this.raftClient.register(metaLogListener);
}

96
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

@ -119,6 +119,7 @@ import java.util.stream.Collectors; @@ -119,6 +119,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INELIGIBLE_REPLICA;
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
@ -150,11 +151,14 @@ public class ReplicationControlManager { @@ -150,11 +151,14 @@ public class ReplicationControlManager {
private LogContext logContext = null;
private short defaultReplicationFactor = (short) 3;
private int defaultNumPartitions = 1;
private int defaultMinIsr = 1;
private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
private ConfigurationControlManager configurationControl = null;
private ClusterControlManager clusterControl = null;
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private FeatureControlManager featureControl = null;
private boolean eligibleLeaderReplicasEnabled = false;
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
@ -176,6 +180,16 @@ public class ReplicationControlManager { @@ -176,6 +180,16 @@ public class ReplicationControlManager {
return this;
}
Builder setDefaultMinIsr(int defaultMinIsr) {
this.defaultMinIsr = defaultMinIsr;
return this;
}
Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}
Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
return this;
@ -216,7 +230,9 @@ public class ReplicationControlManager { @@ -216,7 +230,9 @@ public class ReplicationControlManager {
logContext,
defaultReplicationFactor,
defaultNumPartitions,
defaultMinIsr,
maxElectionsPerImbalance,
eligibleLeaderReplicasEnabled,
configurationControl,
clusterControl,
createTopicPolicy,
@ -279,6 +295,16 @@ public class ReplicationControlManager { @@ -279,6 +295,16 @@ public class ReplicationControlManager {
*/
private final int defaultNumPartitions;
/**
* The default min ISR that is used if a CreateTopics request does not specify one.
*/
private final int defaultMinIsr;
/**
* True if eligible leader replicas is enabled.
*/
private final boolean eligibleLeaderReplicasEnabled;
/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
@ -356,7 +382,9 @@ public class ReplicationControlManager { @@ -356,7 +382,9 @@ public class ReplicationControlManager {
LogContext logContext,
short defaultReplicationFactor,
int defaultNumPartitions,
int defaultMinIsr,
int maxElectionsPerImbalance,
boolean eligibleLeaderReplicasEnabled,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
Optional<CreateTopicPolicy> createTopicPolicy,
@ -366,7 +394,9 @@ public class ReplicationControlManager { @@ -366,7 +394,9 @@ public class ReplicationControlManager {
this.log = logContext.logger(ReplicationControlManager.class);
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.defaultMinIsr = defaultMinIsr;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.configurationControl = configurationControl;
this.createTopicPolicy = createTopicPolicy;
this.featureControl = featureControl;
@ -761,7 +791,7 @@ public class ReplicationControlManager { @@ -761,7 +791,7 @@ public class ReplicationControlManager {
for (Entry<Integer, PartitionRegistration> partEntry : newParts.entrySet()) {
int partitionIndex = partEntry.getKey();
PartitionRegistration info = partEntry.getValue();
records.add(info.toRecord(topicId, partitionIndex));
records.add(info.toRecord(topicId, partitionIndex, featureControl.metadataVersion().partitionRecordVersion()));
}
return ApiError.NONE;
}
@ -947,6 +977,10 @@ public class ReplicationControlManager { @@ -947,6 +977,10 @@ public class ReplicationControlManager {
return new HashSet<>(imbalancedPartitions);
}
boolean isElrEnabled() {
return eligibleLeaderReplicasEnabled && featureControl.metadataVersion().isElrSupported();
}
ControllerResult<AlterPartitionResponseData> alterPartition(
ControllerRequestContext context,
AlterPartitionRequestData request
@ -1009,9 +1043,11 @@ public class ReplicationControlManager { @@ -1009,9 +1043,11 @@ public class ReplicationControlManager {
topic.id,
partitionId,
clusterControl::isActive,
featureControl.metadataVersion()
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@ -1397,9 +1433,12 @@ public class ReplicationControlManager { @@ -1397,9 +1433,12 @@ public class ReplicationControlManager {
topicId,
partitionId,
clusterControl::isActive,
featureControl.metadataVersion()
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic)
);
builder.setElection(election).setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setElection(election)
.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
if (electionType == ElectionType.PREFERRED) {
@ -1532,10 +1571,12 @@ public class ReplicationControlManager { @@ -1532,10 +1571,12 @@ public class ReplicationControlManager {
topicPartition.topicId(),
topicPartition.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion()
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
builder.build().ifPresent(records::add);
}
@ -1655,17 +1696,8 @@ public class ReplicationControlManager { @@ -1655,17 +1696,8 @@ public class ReplicationControlManager {
"Unable to replicate the partition " + replicationFactor +
" time(s): All brokers are currently fenced or in controlled shutdown.");
}
records.add(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(partitionId).
setTopicId(topicId).
setReplicas(replicas).
setIsr(isr).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(isr.get(0)).
setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0));
records.add(buildPartitionRegistration(replicas, isr)
.toRecord(topicId, partitionId, featureControl.metadataVersion().partitionRecordVersion()));
partitionId++;
}
}
@ -1754,9 +1786,11 @@ public class ReplicationControlManager { @@ -1754,9 +1786,11 @@ public class ReplicationControlManager {
topicIdPart.topicId(),
topicIdPart.partitionId(),
isAcceptableLeader,
featureControl.metadataVersion()
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@ -1867,9 +1901,11 @@ public class ReplicationControlManager { @@ -1867,9 +1901,11 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion()
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topicName)
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@ -1925,9 +1961,11 @@ public class ReplicationControlManager { @@ -1925,9 +1961,11 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion()
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString())
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
@ -1995,6 +2033,26 @@ public class ReplicationControlManager { @@ -1995,6 +2033,26 @@ public class ReplicationControlManager {
setReplicas(Replicas.toList(partition.replicas)));
}
// Visible to test.
int getTopicEffectiveMinIsr(String topicName) {
int currentMinIsr = defaultMinIsr;
String minIsrConfig = configurationControl.getTopicConfig(topicName, MIN_IN_SYNC_REPLICAS_CONFIG);
if (minIsrConfig != null) {
currentMinIsr = Integer.parseInt(minIsrConfig);
} else {
log.warn("Can't find the min isr config for topic: " + topicName + " using default value " + defaultMinIsr);
}
int replicationFactor = defaultReplicationFactor;
try {
Uuid topicId = topicsByName.get(topicName);
replicationFactor = topics.get(topicId).parts.get(0).replicas.length;
} catch (Exception e) {
log.warn("Can't find the replication factor for topic: " + topicName + " using default value " + replicationFactor + ". Error=" + e);
}
return Math.min(currentMinIsr, replicationFactor);
}
private static final class IneligibleReplica {
private final int replicaId;
private final String reason;

2
metadata/src/main/java/org/apache/kafka/image/TopicImage.java

@ -68,7 +68,7 @@ public final class TopicImage { @@ -68,7 +68,7 @@ public final class TopicImage {
for (Entry<Integer, PartitionRegistration> entry : partitions.entrySet()) {
int partitionId = entry.getKey();
PartitionRegistration partition = entry.getValue();
writer.write(partition.toRecord(id, partitionId));
writer.write(partition.toRecord(id, partitionId, options.metadataVersion().partitionRecordVersion()));
}
}

66
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java

@ -42,6 +42,8 @@ public class PartitionRegistration { @@ -42,6 +42,8 @@ public class PartitionRegistration {
private int[] isr;
private int[] removingReplicas = Replicas.NONE;
private int[] addingReplicas = Replicas.NONE;
private int[] elr = Replicas.NONE;
private int[] lastKnownElr = Replicas.NONE;
private Integer leader;
private LeaderRecoveryState leaderRecoveryState;
private Integer leaderEpoch;
@ -67,6 +69,16 @@ public class PartitionRegistration { @@ -67,6 +69,16 @@ public class PartitionRegistration {
return this;
}
public Builder setElr(int[] elr) {
this.elr = elr;
return this;
}
public Builder setLastKnownElr(int[] lastKnownElr) {
this.lastKnownElr = lastKnownElr;
return this;
}
public Builder setLeader(Integer leader) {
this.leader = leader;
return this;
@ -104,6 +116,10 @@ public class PartitionRegistration { @@ -104,6 +116,10 @@ public class PartitionRegistration {
throw new IllegalStateException("You must set leader epoch.");
} else if (partitionEpoch == null) {
throw new IllegalStateException("You must set partition epoch.");
} else if (elr == null) {
throw new IllegalStateException("You must set ELR.");
} else if (lastKnownElr == null) {
throw new IllegalStateException("You must set last known elr.");
}
return new PartitionRegistration(
@ -114,7 +130,9 @@ public class PartitionRegistration { @@ -114,7 +130,9 @@ public class PartitionRegistration {
leader,
leaderRecoveryState,
leaderEpoch,
partitionEpoch
partitionEpoch,
elr,
lastKnownElr
);
}
}
@ -123,6 +141,8 @@ public class PartitionRegistration { @@ -123,6 +141,8 @@ public class PartitionRegistration {
public final int[] isr;
public final int[] removingReplicas;
public final int[] addingReplicas;
public final int[] elr;
public final int[] lastKnownElr;
public final int leader;
public final LeaderRecoveryState leaderRecoveryState;
public final int leaderEpoch;
@ -140,12 +160,14 @@ public class PartitionRegistration { @@ -140,12 +160,14 @@ public class PartitionRegistration {
record.leader(),
LeaderRecoveryState.of(record.leaderRecoveryState()),
record.leaderEpoch(),
record.partitionEpoch());
record.partitionEpoch(),
Replicas.toArray(record.eligibleLeaderReplicas()),
Replicas.toArray(record.lastKnownELR()));
}
private PartitionRegistration(int[] replicas, int[] isr, int[] removingReplicas,
int[] addingReplicas, int leader, LeaderRecoveryState leaderRecoveryState,
int leaderEpoch, int partitionEpoch) {
int leaderEpoch, int partitionEpoch, int[] elr, int[] lastKnownElr) {
this.replicas = replicas;
this.isr = isr;
this.removingReplicas = removingReplicas;
@ -154,6 +176,10 @@ public class PartitionRegistration { @@ -154,6 +176,10 @@ public class PartitionRegistration {
this.leaderRecoveryState = leaderRecoveryState;
this.leaderEpoch = leaderEpoch;
this.partitionEpoch = partitionEpoch;
// We could parse a lower version record without elr/lastKnownElr.
this.elr = elr == null ? new int[0] : elr;
this.lastKnownElr = lastKnownElr == null ? new int[0] : lastKnownElr;
}
public PartitionRegistration merge(PartitionChangeRecord record) {
@ -177,6 +203,8 @@ public class PartitionRegistration { @@ -177,6 +203,8 @@ public class PartitionRegistration {
LeaderRecoveryState newLeaderRecoveryState = leaderRecoveryState.changeTo(record.leaderRecoveryState());
int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas());
int[] newLastKnownElr = (record.lastKnownELR() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownELR());
return new PartitionRegistration(newReplicas,
newIsr,
newRemovingReplicas,
@ -184,7 +212,9 @@ public class PartitionRegistration { @@ -184,7 +212,9 @@ public class PartitionRegistration {
newLeader,
newLeaderRecoveryState,
newLeaderEpoch,
partitionEpoch + 1);
partitionEpoch + 1,
newElr,
newLastKnownElr);
}
public String diff(PartitionRegistration prev) {
@ -229,6 +259,18 @@ public class PartitionRegistration { @@ -229,6 +259,18 @@ public class PartitionRegistration {
append(prev.leaderEpoch).append(" -> ").append(leaderEpoch);
prefix = ", ";
}
if (!Arrays.equals(elr, prev.elr)) {
builder.append(prefix).append("elr: ").
append(Arrays.toString(prev.elr)).
append(" -> ").append(Arrays.toString(elr));
prefix = ", ";
}
if (!Arrays.equals(lastKnownElr, prev.lastKnownElr)) {
builder.append(prefix).append("lastKnownElr: ").
append(Arrays.toString(prev.lastKnownElr)).
append(" -> ").append(Arrays.toString(lastKnownElr));
prefix = ", ";
}
if (partitionEpoch != prev.partitionEpoch) {
builder.append(prefix).append("partitionEpoch: ").
append(prev.partitionEpoch).append(" -> ").append(partitionEpoch);
@ -256,8 +298,8 @@ public class PartitionRegistration { @@ -256,8 +298,8 @@ public class PartitionRegistration {
return replicas.length == 0 ? LeaderConstants.NO_LEADER : replicas[0];
}
public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId) {
return new ApiMessageAndVersion(new PartitionRecord().
public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, short version) {
PartitionRecord record = new PartitionRecord().
setPartitionId(partitionId).
setTopicId(topicId).
setReplicas(Replicas.toList(replicas)).
@ -267,7 +309,12 @@ public class PartitionRegistration { @@ -267,7 +309,12 @@ public class PartitionRegistration {
setLeader(leader).
setLeaderRecoveryState(leaderRecoveryState.value()).
setLeaderEpoch(leaderEpoch).
setPartitionEpoch(partitionEpoch), (short) 0);
setPartitionEpoch(partitionEpoch);
if (version > 0) {
record.setEligibleLeaderReplicas(Replicas.toList(elr)).
setLastKnownELR(Replicas.toList(lastKnownElr));
}
return new ApiMessageAndVersion(record, version);
}
public LeaderAndIsrPartitionState toLeaderAndIsrPartitionState(TopicPartition tp,
@ -290,6 +337,7 @@ public class PartitionRegistration { @@ -290,6 +337,7 @@ public class PartitionRegistration {
@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(replicas), Arrays.hashCode(isr), Arrays.hashCode(removingReplicas),
Arrays.hashCode(elr), Arrays.hashCode(lastKnownElr),
Arrays.hashCode(addingReplicas), leader, leaderRecoveryState, leaderEpoch, partitionEpoch);
}
@ -301,6 +349,8 @@ public class PartitionRegistration { @@ -301,6 +349,8 @@ public class PartitionRegistration {
Arrays.equals(isr, other.isr) &&
Arrays.equals(removingReplicas, other.removingReplicas) &&
Arrays.equals(addingReplicas, other.addingReplicas) &&
Arrays.equals(elr, other.elr) &&
Arrays.equals(lastKnownElr, other.lastKnownElr) &&
leader == other.leader &&
leaderRecoveryState == other.leaderRecoveryState &&
leaderEpoch == other.leaderEpoch &&
@ -314,6 +364,8 @@ public class PartitionRegistration { @@ -314,6 +364,8 @@ public class PartitionRegistration {
builder.append(", isr=").append(Arrays.toString(isr));
builder.append(", removingReplicas=").append(Arrays.toString(removingReplicas));
builder.append(", addingReplicas=").append(Arrays.toString(addingReplicas));
builder.append(", elr=").append(Arrays.toString(elr));
builder.append(", lastKnownElr=").append(Arrays.toString(lastKnownElr));
builder.append(", leader=").append(leader);
builder.append(", leaderRecoveryState=").append(leaderRecoveryState);
builder.append(", leaderEpoch=").append(leaderEpoch);

12
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json

@ -17,7 +17,9 @@ @@ -17,7 +17,9 @@
"apiKey": 5,
"type": "metadata",
"name": "PartitionChangeRecord",
"validVersions": "0",
"validVersions": "0-1",
// Version 1 implements Eligiable Leader Replicas and LastKnownELR as described in KIP-966.
//
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
@ -40,6 +42,12 @@ @@ -40,6 +42,12 @@
"versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 4,
"about": "null if the adding replicas didn't change; the new adding replicas otherwise." },
{ "name": "LeaderRecoveryState", "type": "int8", "default": "-1", "versions": "0+", "taggedVersions": "0+", "tag": 5,
"about": "-1 if it didn't change; 0 if the leader was elected from the ISR or recovered from an unclean election; 1 if the leader that was elected using unclean leader election and it is still recovering." }
"about": "-1 if it didn't change; 0 if the leader was elected from the ISR or recovered from an unclean election; 1 if the leader that was elected using unclean leader election and it is still recovering." },
{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6,
"about": "null if the ELR didn't change; the new eligible leader replicas otherwise." },
{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 7,
"about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
]
}

12
metadata/src/main/resources/common/metadata/PartitionRecord.json

@ -17,7 +17,9 @@ @@ -17,7 +17,9 @@
"apiKey": 3,
"type": "metadata",
"name": "PartitionRecord",
"validVersions": "0",
"validVersions": "0-1",
// Version 1 implements Eligiable Leader Replicas and LastKnownELR as described in KIP-966.
//
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
@ -39,6 +41,12 @@ @@ -39,6 +41,12 @@
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "The epoch of the partition leader." },
{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "An epoch that gets incremented each time we change anything in the partition." }
"about": "An epoch that gets incremented each time we change anything in the partition." },
{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1,
"about": "The eligible leader replicas of this partition." },
{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2,
"about": "The last known eligible leader replicas of this partition." }
]
}

3
metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java

@ -57,6 +57,7 @@ import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; @@ -57,6 +57,7 @@ import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@ -136,6 +137,8 @@ public class ConfigurationControlManagerTest { @@ -136,6 +137,8 @@ public class ConfigurationControlManagerTest {
setName("def").setValue("blah"));
assertEquals(toMap(entry("abc", "x,y,z"), entry("def", "blah")),
manager.getConfigs(MYTOPIC));
assertEquals("x,y,z", manager.getTopicConfig(MYTOPIC.name(), "abc"));
assertTrue(manager.getTopicConfig(MYTOPIC.name(), "none-exists") == null);
}
@Test

377
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java

@ -38,9 +38,9 @@ import java.util.Arrays; @@ -38,9 +38,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.controller.PartitionChangeBuilder.Election;
import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@ -53,13 +53,17 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -53,13 +53,17 @@ import static org.junit.jupiter.params.provider.Arguments.arguments;
@Timeout(value = 40)
public class PartitionChangeBuilderTest {
private static Stream<Arguments> partitionChangeRecordVersions() {
return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version));
}
@Test
public void testChangeRecordIsNoOp() {
/* If the next few checks fail please update them based on the latest schema and make sure
* to update changeRecordIsNoOp to take into account the new schema or tagged fields.
*/
// Check that the supported versions haven't changed
assertEquals(0, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION);
assertEquals(1, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION);
assertEquals(0, PartitionChangeRecord.LOWEST_SUPPORTED_VERSION);
// For the latest version check that the number of tagged fields hasn't changed
TaggedFields taggedFields = (TaggedFields) PartitionChangeRecord.SCHEMA_0.get(2).def.type;
@ -73,6 +77,10 @@ public class PartitionChangeBuilderTest { @@ -73,6 +77,10 @@ public class PartitionChangeBuilderTest {
setRemovingReplicas(Arrays.asList(1))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setAddingReplicas(Arrays.asList(4))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setEligibleLeaderReplicas(Arrays.asList(5))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setLastKnownELR(Arrays.asList(6))));
assertFalse(
changeRecordIsNoOp(
new PartitionChangeRecord()
@ -92,12 +100,16 @@ public class PartitionChangeBuilderTest { @@ -92,12 +100,16 @@ public class PartitionChangeBuilderTest {
private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
private static PartitionChangeBuilder createFooBuilder() {
return createFooBuilder(MetadataVersion.latest());
private static MetadataVersion metadataVersionForPartitionChangeRecordVersion(short version) {
return isElrEnabled(version) ? MetadataVersion.IBP_3_7_IV1 : MetadataVersion.IBP_3_7_IV0;
}
private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataVersion) {
return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersion);
return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersion, 2);
}
private static PartitionChangeBuilder createFooBuilder(short version) {
return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
@ -113,8 +125,12 @@ public class PartitionChangeBuilderTest { @@ -113,8 +125,12 @@ public class PartitionChangeBuilderTest {
private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
private static PartitionChangeBuilder createBarBuilder() {
return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, MetadataVersion.latest());
private static boolean isElrEnabled(short partitionChangeRecordVersion) {
return partitionChangeRecordVersion > 0;
}
private static PartitionChangeBuilder createBarBuilder(short version) {
return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static final PartitionRegistration BAZ = new PartitionRegistration.Builder().
@ -128,8 +144,8 @@ public class PartitionChangeBuilderTest { @@ -128,8 +144,8 @@ public class PartitionChangeBuilderTest {
private final static Uuid BAZ_ID = Uuid.fromString("wQzt5gkSTwuQNXZF5gIw7A");
private static PartitionChangeBuilder createBazBuilder() {
return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true, MetadataVersion.latest());
private static PartitionChangeBuilder createBazBuilder(short version) {
return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static final PartitionRegistration OFFLINE = new PartitionRegistration.Builder().
@ -143,8 +159,8 @@ public class PartitionChangeBuilderTest { @@ -143,8 +159,8 @@ public class PartitionChangeBuilderTest {
private final static Uuid OFFLINE_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
private static PartitionChangeBuilder createOfflineBuilder() {
return new PartitionChangeBuilder(OFFLINE, OFFLINE_ID, 0, r -> r == 1, MetadataVersion.latest());
private static PartitionChangeBuilder createOfflineBuilder(short version) {
return new PartitionChangeBuilder(OFFLINE, OFFLINE_ID, 0, r -> r == 1, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static void assertElectLeaderEquals(PartitionChangeBuilder builder,
@ -155,29 +171,30 @@ public class PartitionChangeBuilderTest { @@ -155,29 +171,30 @@ public class PartitionChangeBuilderTest {
assertEquals(expectedUnclean, electionResult.unclean);
}
@Test
public void testElectLeader() {
assertElectLeaderEquals(createFooBuilder().setElection(Election.PREFERRED), 2, false);
assertElectLeaderEquals(createFooBuilder(), 1, false);
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN), 1, false);
assertElectLeaderEquals(createFooBuilder()
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testElectLeader(short version) {
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.PREFERRED), 2, false);
assertElectLeaderEquals(createFooBuilder(version), 1, false);
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN), 1, false);
assertElectLeaderEquals(createFooBuilder(version)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 3))), 1, false);
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN)
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 3))), 1, false);
assertElectLeaderEquals(createFooBuilder()
assertElectLeaderEquals(createFooBuilder(version)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))), NO_LEADER, false);
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN).
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN).
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))), 2, true);
assertElectLeaderEquals(
createFooBuilder().setElection(Election.UNCLEAN)
createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(4))).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
4,
false
);
assertElectLeaderEquals(createBazBuilder().setElection(Election.PREFERRED), 3, false);
assertElectLeaderEquals(createBazBuilder(), 3, false);
assertElectLeaderEquals(createBazBuilder().setElection(Election.UNCLEAN), 3, false);
assertElectLeaderEquals(createBazBuilder(version).setElection(Election.PREFERRED), 3, false);
assertElectLeaderEquals(createBazBuilder(version), 3, false);
assertElectLeaderEquals(createBazBuilder(version).setElection(Election.UNCLEAN), 3, false);
}
private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
@ -187,13 +204,14 @@ public class PartitionChangeBuilderTest { @@ -187,13 +204,14 @@ public class PartitionChangeBuilderTest {
assertEquals(expectedLeader, record.leader());
}
@Test
public void testTriggerLeaderEpochBumpIfNeeded() {
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(),
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testTriggerLeaderEpochBumpIfNeeded(short version) {
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version),
new PartitionChangeRecord(), NO_LEADER_CHANGE);
// Shrinking the ISR doesn't increase the leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder().setTargetIsrWithBrokerStates(
createFooBuilder(version).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
),
new PartitionChangeRecord(),
@ -201,7 +219,7 @@ public class PartitionChangeBuilderTest { @@ -201,7 +219,7 @@ public class PartitionChangeBuilderTest {
);
// Expanding the ISR doesn't increase the leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder().setTargetIsrWithBrokerStates(
createFooBuilder(version).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))
),
new PartitionChangeRecord(),
@ -209,24 +227,24 @@ public class PartitionChangeBuilderTest { @@ -209,24 +227,24 @@ public class PartitionChangeBuilderTest {
);
// Expanding the ISR during migration doesn't increase leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder()
createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4)))
.setZkMigrationEnabled(true),
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
new PartitionChangeRecord().setLeader(2), 2);
// Check that the leader epoch is bump if the ISR shrinks and isSkipLeaderEpochBumpSupported is not supported.
// See KAFKA-15021 for details.
testTriggerLeaderEpochBumpIfNeededLeader(
new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.IBP_3_5_IV2)
new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.IBP_3_5_IV2, 2)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
),
@ -235,11 +253,12 @@ public class PartitionChangeBuilderTest { @@ -235,11 +253,12 @@ public class PartitionChangeBuilderTest {
);
}
@Test
public void testLeaderEpochBumpZkMigration() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testLeaderEpochBumpZkMigration(short version) {
// KAFKA-15109: Shrinking the ISR while in ZK migration mode requires a leader epoch bump
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder()
createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
.setZkMigrationEnabled(true),
@ -248,7 +267,7 @@ public class PartitionChangeBuilderTest { @@ -248,7 +267,7 @@ public class PartitionChangeBuilderTest {
);
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder()
createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
.setZkMigrationEnabled(false),
@ -276,17 +295,19 @@ public class PartitionChangeBuilderTest { @@ -276,17 +295,19 @@ public class PartitionChangeBuilderTest {
);
}
@Test
public void testNoChange() {
assertEquals(Optional.empty(), createFooBuilder().build());
assertEquals(Optional.empty(), createFooBuilder().setElection(Election.UNCLEAN).build());
assertEquals(Optional.empty(), createBarBuilder().build());
assertEquals(Optional.empty(), createBarBuilder().setElection(Election.UNCLEAN).build());
assertEquals(Optional.empty(), createBazBuilder().setElection(Election.PREFERRED).build());
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testNoChange(short version) {
assertEquals(Optional.empty(), createFooBuilder(version).build());
assertEquals(Optional.empty(), createFooBuilder(version).setElection(Election.UNCLEAN).build());
assertEquals(Optional.empty(), createBarBuilder(version).build());
assertEquals(Optional.empty(), createBarBuilder(version).setElection(Election.UNCLEAN).build());
assertEquals(Optional.empty(), createBazBuilder(version).setElection(Election.PREFERRED).build());
}
@Test
public void testIsrChangeDoesntBumpLeaderEpoch() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testIsrChangeDoesntBumpLeaderEpoch(short version) {
// Changing the ISR should not cause the leader epoch to increase
assertEquals(
// Expected
@ -296,11 +317,11 @@ public class PartitionChangeBuilderTest { @@ -296,11 +317,11 @@ public class PartitionChangeBuilderTest {
.setTopicId(FOO_ID)
.setPartitionId(0)
.setIsr(Arrays.asList(2, 1)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
version
)
),
// Actual
createFooBuilder()
createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
)
@ -308,28 +329,31 @@ public class PartitionChangeBuilderTest { @@ -308,28 +329,31 @@ public class PartitionChangeBuilderTest {
);
}
@Test
public void testIsrChangeAndLeaderChange() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testIsrChangeAndLeaderChange(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setIsr(Arrays.asList(2, 3)).
setLeader(2), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 3))).build());
setLeader(2), version)),
createFooBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 3))).build());
}
@Test
public void testReassignmentRearrangesReplicas() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testReassignmentRearrangesReplicas(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(3, 2, 1)),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder().setTargetReplicas(Arrays.asList(3, 2, 1)).build());
version)),
createFooBuilder(version).setTargetReplicas(Arrays.asList(3, 2, 1)).build());
}
@Test
public void testIsrEnlargementCompletesReassignment() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testIsrEnlargementCompletesReassignment(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(BAR_ID).
setPartitionId(0).
@ -338,12 +362,13 @@ public class PartitionChangeBuilderTest { @@ -338,12 +362,13 @@ public class PartitionChangeBuilderTest {
setLeader(2).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createBarBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3, 4))).build());
version)),
createBarBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3, 4))).build());
}
@Test
public void testRevertReassignment() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testRevertReassignment(short version) {
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR);
assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
assertEquals(Arrays.asList(1, 2, 3), revert.isr());
@ -354,8 +379,8 @@ public class PartitionChangeBuilderTest { @@ -354,8 +379,8 @@ public class PartitionChangeBuilderTest {
setLeader(1).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createBarBuilder().
version)),
createBarBuilder(version).
setTargetReplicas(revert.replicas()).
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(revert.isr())).
setTargetRemoving(Collections.emptyList()).
@ -363,8 +388,9 @@ public class PartitionChangeBuilderTest { @@ -363,8 +388,9 @@ public class PartitionChangeBuilderTest {
build());
}
@Test
public void testRemovingReplicaReassignment() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testRemovingReplicaReassignment(short version) {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2)));
assertEquals(Collections.singletonList(3), replicas.removing());
@ -376,15 +402,16 @@ public class PartitionChangeBuilderTest { @@ -376,15 +402,16 @@ public class PartitionChangeBuilderTest {
setReplicas(Arrays.asList(1, 2)).
setIsr(Arrays.asList(2, 1)).
setLeader(1),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder().
version)),
createFooBuilder(version).
setTargetReplicas(replicas.replicas()).
setTargetRemoving(replicas.removing()).
build());
}
@Test
public void testAddingReplicaReassignment() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testAddingReplicaReassignment(short version) {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2, 3, 4)));
assertEquals(Collections.emptyList(), replicas.removing());
@ -395,15 +422,16 @@ public class PartitionChangeBuilderTest { @@ -395,15 +422,16 @@ public class PartitionChangeBuilderTest {
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3, 4)).
setAddingReplicas(Collections.singletonList(4)),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder().
version)),
createFooBuilder(version).
setTargetReplicas(replicas.replicas()).
setTargetAdding(replicas.adding()).
build());
}
@Test
public void testUncleanLeaderElection() {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testUncleanLeaderElection(short version) {
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(FOO_ID)
@ -411,11 +439,11 @@ public class PartitionChangeBuilderTest { @@ -411,11 +439,11 @@ public class PartitionChangeBuilderTest {
.setIsr(Arrays.asList(2))
.setLeader(2)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
version
);
assertEquals(
Optional.of(expectedRecord),
createFooBuilder().setElection(Election.UNCLEAN)
createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))).build()
);
@ -426,15 +454,16 @@ public class PartitionChangeBuilderTest { @@ -426,15 +454,16 @@ public class PartitionChangeBuilderTest {
.setIsr(Arrays.asList(1))
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
version
);
assertEquals(
Optional.of(expectedRecord),
createOfflineBuilder().setElection(Election.UNCLEAN).build()
createOfflineBuilder(version).setElection(Election.UNCLEAN).build()
);
assertEquals(
Optional.of(expectedRecord),
createOfflineBuilder().setElection(Election.UNCLEAN)
createOfflineBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2))).build()
);
}
@ -471,7 +500,8 @@ public class PartitionChangeBuilderTest { @@ -471,7 +500,8 @@ public class PartitionChangeBuilderTest {
FOO_ID,
0,
brokerId -> false,
metadataVersion
metadataVersion,
2
);
offlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// Set the target ISR to empty to indicate that the last leader is offline
@ -497,7 +527,8 @@ public class PartitionChangeBuilderTest { @@ -497,7 +527,8 @@ public class PartitionChangeBuilderTest {
FOO_ID,
0,
brokerId -> true,
metadataVersion
metadataVersion,
2
);
onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
@ -534,7 +565,8 @@ public class PartitionChangeBuilderTest { @@ -534,7 +565,8 @@ public class PartitionChangeBuilderTest {
FOO_ID,
0,
brokerId -> brokerId == leaderId,
metadataVersion
metadataVersion,
2
).setElection(Election.UNCLEAN);
onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// The partition should stay as recovering
@ -603,7 +635,8 @@ public class PartitionChangeBuilderTest { @@ -603,7 +635,8 @@ public class PartitionChangeBuilderTest {
topicId,
0,
isValidLeader,
leaderRecoveryMetadataVersion(false)
leaderRecoveryMetadataVersion(false),
2
);
// Before we build the new PartitionChangeBuilder, confirm the current leader is 0.
@ -617,7 +650,7 @@ public class PartitionChangeBuilderTest { @@ -617,7 +650,7 @@ public class PartitionChangeBuilderTest {
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(NO_LEADER),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
(short) 0)),
partitionChangeBuilder.setTargetIsr(Arrays.asList(0, 1, 2, 3)).
build());
}
@ -629,4 +662,180 @@ public class PartitionChangeBuilderTest { @@ -629,4 +662,180 @@ public class PartitionChangeBuilderTest {
return MetadataVersion.IBP_3_1_IV0;
}
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setIsr(new int[] {1, 2, 3, 4})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version));
// Update ISR to {1, 2}
builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2)));
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Arrays.asList(1, 2))
.setLeader(-2)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
if (version > 0) {
record.setEligibleLeaderReplicas(Arrays.asList(3, 4));
}
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
record,
version
);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
if (version > 0) {
assertTrue(Arrays.equals(new int[]{3, 4}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
} else {
assertEquals(0, partition.elr.length);
assertEquals(0, partition.lastKnownElr.length);
}
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setIsr(new int[] {1, 2})
.setElr(new int[] {3})
.setLastKnownElr(new int[] {4})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version));
builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3)));
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Arrays.asList(1, 2, 3))
.setLeader(-2)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
// Both versions will set the elr and lastKnownElr as empty list.
record.setEligibleLeaderReplicas(Collections.emptyList())
.setLastKnownELR(Collections.emptyList());
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
record,
version
);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
assertEquals(0, partition.elr.length);
assertEquals(0, partition.lastKnownElr.length);
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_IsrAddNewMemberNotInELR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setIsr(new int[] {1})
.setElr(new int[] {3})
.setLastKnownElr(new int[] {2})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version));
builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 4)));
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Arrays.asList(1, 4))
.setLeader(-2)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
if (version == 0) {
record.setEligibleLeaderReplicas(Collections.emptyList());
record.setLastKnownELR(Collections.emptyList());
}
// No change is expected to ELR/LastKnownELR.
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
record,
version
);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
if (version > 0) {
assertTrue(Arrays.equals(new int[]{3}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{2}, partition.lastKnownElr), partition.toString());
} else {
assertTrue(Arrays.equals(new int[]{}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
}
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_RemoveUncleanShutdownReplicasFromElr(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setIsr(new int[] {1})
.setElr(new int[] {2, 3})
.setLastKnownElr(new int[] {})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version));
builder.setUncleanShutdownReplicas(Arrays.asList(3));
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setLeader(-2)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
if (version > 0) {
record.setEligibleLeaderReplicas(Arrays.asList(2))
.setLastKnownELR(Arrays.asList(3));
} else {
record.setEligibleLeaderReplicas(Collections.emptyList());
}
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
record,
version
);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
if (version > 0) {
assertTrue(Arrays.equals(new int[]{2}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{3}, partition.lastKnownElr), partition.toString());
} else {
assertTrue(Arrays.equals(new int[]{}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
}
}
}

6
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java

@ -188,7 +188,7 @@ public class QuorumControllerTest { @@ -188,7 +188,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
@ -231,7 +231,7 @@ public class QuorumControllerTest { @@ -231,7 +231,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
@ -573,7 +573,7 @@ public class QuorumControllerTest { @@ -573,7 +573,7 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)).
setListeners(listeners));
assertEquals(5L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =

122
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

@ -21,6 +21,7 @@ import org.apache.kafka.common.ElectionType; @@ -21,6 +21,7 @@ import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
@ -159,6 +160,7 @@ public class ReplicationControlManagerTest { @@ -159,6 +160,7 @@ public class ReplicationControlManagerTest {
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private MetadataVersion metadataVersion = MetadataVersion.latest();
private MockTime mockTime = new MockTime();
private boolean isElrEnabled = false;
Builder setCreateTopicPolicy(CreateTopicPolicy createTopicPolicy) {
this.createTopicPolicy = Optional.of(createTopicPolicy);
@ -170,6 +172,11 @@ public class ReplicationControlManagerTest { @@ -170,6 +172,11 @@ public class ReplicationControlManagerTest {
return this;
}
Builder setIsElrEnabled(Boolean isElrEnabled) {
this.isElrEnabled = isElrEnabled;
return this;
}
Builder setMockTime(MockTime mockTime) {
this.mockTime = mockTime;
return this;
@ -178,7 +185,15 @@ public class ReplicationControlManagerTest { @@ -178,7 +185,15 @@ public class ReplicationControlManagerTest {
ReplicationControlTestContext build() {
return new ReplicationControlTestContext(metadataVersion,
createTopicPolicy,
mockTime);
mockTime,
isElrEnabled);
}
ReplicationControlTestContext build(MetadataVersion metadataVersion) {
return new ReplicationControlTestContext(metadataVersion,
createTopicPolicy,
mockTime,
isElrEnabled);
}
}
@ -202,7 +217,8 @@ public class ReplicationControlManagerTest { @@ -202,7 +217,8 @@ public class ReplicationControlManagerTest {
private ReplicationControlTestContext(
MetadataVersion metadataVersion,
Optional<CreateTopicPolicy> createTopicPolicy,
MockTime time
MockTime time,
Boolean isElrEnabled
) {
this.time = time;
this.featureControl = new FeatureControlManager.Builder().
@ -229,6 +245,7 @@ public class ReplicationControlManagerTest { @@ -229,6 +245,7 @@ public class ReplicationControlManagerTest {
setClusterControl(clusterControl).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
setEligibleLeaderReplicasEnabled(isElrEnabled).
build();
clusterControl.activate();
}
@ -880,6 +897,91 @@ public class ReplicationControlManagerTest { @@ -880,6 +897,91 @@ public class ReplicationControlManagerTest {
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
}
@Test
public void testEligibleLeaderReplicas_ShrinkAndExpandIsr() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
// Change ISR to {0}.
PartitionData shrinkIsrRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> shrinkIsrResult = sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
AlterPartitionResponseData.PartitionData shrinkIsrResponse = assertAlterPartitionResponse(
shrinkIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertTrue(Arrays.equals(new int[]{1, 2}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
PartitionData expandIsrRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> expandIsrResult = sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
AlterPartitionResponseData.PartitionData expandIsrResponse = assertAlterPartitionResponse(
expandIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertTrue(Arrays.equals(new int[]{}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
}
@Test
public void testEligibleLeaderReplicas_BrokerFence() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2, 3}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
ctx.fenceBrokers(Utils.mkSet(2, 3));
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertTrue(Arrays.equals(new int[]{3}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
ctx.unfenceBrokers(0, 1, 2, 3);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr), partition.toString());
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
}
@Test
public void testEligibleLeaderReplicas_EffectiveMinIsr() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][]{new int[]{0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "5");
assertEquals(3, replicationControl.getTopicEffectiveMinIsr("foo"));
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionHandleUnknownTopicIdOrName(short version) throws Exception {
@ -1504,7 +1606,7 @@ public class ReplicationControlManagerTest { @@ -1504,7 +1606,7 @@ public class ReplicationControlManagerTest {
setReplicas(asList(2, 1, 3)).
setLeader(3).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()), (short) 0)),
setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
@ -1855,7 +1957,7 @@ public class ReplicationControlManagerTest { @@ -1855,7 +1957,7 @@ public class ReplicationControlManagerTest {
setLeader(4).
setReplicas(asList(2, 3, 4)).
setRemovingReplicas(null).
setAddingReplicas(Collections.emptyList()), (short) 0)),
setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
@ -1908,8 +2010,8 @@ public class ReplicationControlManagerTest { @@ -1908,8 +2010,8 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
public void testElectUncleanLeaders_WithoutElr(boolean electAllPartitions) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(MetadataVersion.IBP_3_6_IV1);
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@ -2186,13 +2288,13 @@ public class ReplicationControlManagerTest { @@ -2186,13 +2288,13 @@ public class ReplicationControlManagerTest {
setPartitionId(0).
setTopicId(fooId).
setLeader(1),
(short) 0),
MetadataVersion.latest().partitionChangeRecordVersion()),
new ApiMessageAndVersion(
new PartitionChangeRecord().
setPartitionId(2).
setTopicId(fooId).
setLeader(0),
(short) 0)),
MetadataVersion.latest().partitionChangeRecordVersion())),
election2Result.records());
}
@ -2235,7 +2337,7 @@ public class ReplicationControlManagerTest { @@ -2235,7 +2337,7 @@ public class ReplicationControlManagerTest {
.setPartitionId(0)
.setTopicId(fooId)
.setLeader(1);
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, (short) 0)), balanceResult.records());
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latest().partitionChangeRecordVersion())), balanceResult.records());
assertTrue(replication.arePartitionLeadersImbalanced());
assertFalse(balanceResult.response());
@ -2267,7 +2369,7 @@ public class ReplicationControlManagerTest { @@ -2267,7 +2369,7 @@ public class ReplicationControlManagerTest {
.setPartitionId(2)
.setTopicId(fooId)
.setLeader(0);
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, (short) 0)), balanceResult.records());
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latest().partitionChangeRecordVersion())), balanceResult.records());
assertFalse(replication.arePartitionLeadersImbalanced());
assertFalse(balanceResult.response());
}

12
metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java

@ -160,15 +160,15 @@ public class ControllerMetricsChangesTest { @@ -160,15 +160,15 @@ public class ControllerMetricsChangesTest {
static {
TOPIC_DELTA1 = new TopicDelta(new TopicImage("foo", FOO_ID, Collections.emptyMap()));
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 0).message());
toRecord(FOO_ID, 0, (short) 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 1).message());
toRecord(FOO_ID, 1, (short) 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 2).message());
toRecord(FOO_ID, 2, (short) 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NON_PREFERRED_LEADER).
toRecord(FOO_ID, 3).message());
toRecord(FOO_ID, 3, (short) 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NON_PREFERRED_LEADER).
toRecord(FOO_ID, 4).message());
toRecord(FOO_ID, 4, (short) 0).message());
TOPIC_DELTA2 = new TopicDelta(TOPIC_DELTA1.apply());
TOPIC_DELTA2.replay(new PartitionChangeRecord().
@ -176,7 +176,7 @@ public class ControllerMetricsChangesTest { @@ -176,7 +176,7 @@ public class ControllerMetricsChangesTest {
setTopicId(FOO_ID).
setLeader(1));
TOPIC_DELTA2.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 5).message());
toRecord(FOO_ID, 5, (short) 0).message());
}
@Test

77
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java

@ -30,9 +30,14 @@ import org.apache.kafka.common.metadata.PartitionRecord; @@ -30,9 +30,14 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -43,6 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; @@ -43,6 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(40)
public class PartitionRegistrationTest {
private static Stream<Arguments> partitionRecordVersions() {
return IntStream.range(PartitionRecord.LOWEST_SUPPORTED_VERSION, PartitionRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version));
}
@Test
public void testElectionWasClean() {
assertTrue(PartitionRegistration.electionWasClean(1, new int[]{1, 2}));
@ -58,12 +66,12 @@ public class PartitionRegistrationTest { @@ -58,12 +66,12 @@ public class PartitionRegistrationTest {
PartitionRegistration b = new PartitionRegistration.Builder().
setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{3}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(1).build();
PartitionRegistration c = new PartitionRegistration.Builder().
setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1}).setLastKnownElr(new int[]{3}).setElr(new int[]{2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
assertEquals(b, a.merge(new PartitionChangeRecord().
setLeader(3).setIsr(Arrays.asList(3))));
assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, partitionEpoch: 0 -> 1",
b.diff(a));
assertEquals("isr: [1, 2] -> [1], partitionEpoch: 0 -> 1",
assertEquals("isr: [1, 2] -> [1], elr: [] -> [2], lastKnownElr: [] -> [3], partitionEpoch: 0 -> 1",
c.diff(a));
}
@ -73,7 +81,7 @@ public class PartitionRegistrationTest { @@ -73,7 +81,7 @@ public class PartitionRegistrationTest {
setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setRemovingReplicas(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
int partitionId = 4;
ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId);
ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId, (short) 0);
PartitionRegistration registrationB =
new PartitionRegistration((PartitionRecord) record.message());
assertEquals(registrationA, registrationB);
@ -200,8 +208,10 @@ public class PartitionRegistrationTest { @@ -200,8 +208,10 @@ public class PartitionRegistrationTest {
@Test
public void testBuilderSuccess() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1}).
setReplicas(new int[]{0, 1, 2}).
setIsr(new int[]{0, 1}).
setElr(new int[]{2}).
setLastKnownElr(new int[]{0, 1, 2}).
setRemovingReplicas(new int[]{0}).
setAddingReplicas(new int[]{1}).
setLeader(0).
@ -209,8 +219,10 @@ public class PartitionRegistrationTest { @@ -209,8 +219,10 @@ public class PartitionRegistrationTest {
setLeaderEpoch(0).
setPartitionEpoch(0);
PartitionRegistration partitionRegistration = builder.build();
assertEquals(Replicas.toList(new int[]{0, 1}), Replicas.toList(partitionRegistration.replicas));
assertEquals(Replicas.toList(new int[]{0, 1, 2}), Replicas.toList(partitionRegistration.replicas));
assertEquals(Replicas.toList(new int[]{0, 1}), Replicas.toList(partitionRegistration.isr));
assertEquals(Replicas.toList(new int[]{2}), Replicas.toList(partitionRegistration.elr));
assertEquals(Replicas.toList(new int[]{0, 1, 2}), Replicas.toList(partitionRegistration.lastKnownElr));
assertEquals(Replicas.toList(new int[]{0}), Replicas.toList(partitionRegistration.removingReplicas));
assertEquals(Replicas.toList(new int[]{1}), Replicas.toList(partitionRegistration.addingReplicas));
assertEquals(0, partitionRegistration.leader);
@ -233,17 +245,49 @@ public class PartitionRegistrationTest { @@ -233,17 +245,49 @@ public class PartitionRegistrationTest {
assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas));
}
@ParameterizedTest
@MethodSource("partitionRecordVersions")
public void testPartitionRegistrationToRecord(short version) {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 2, 3, 4}).
setIsr(new int[]{0, 1}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(0).
setPartitionEpoch(0).
setElr(new int[]{2, 3}).
setLastKnownElr(new int[]{4});
PartitionRegistration partitionRegistration = builder.build();
Uuid topicID = Uuid.randomUuid();
PartitionRecord expectRecord = new PartitionRecord().
setTopicId(topicID).
setPartitionId(0).
setReplicas(Arrays.asList(new Integer[]{0, 1, 2, 3, 4})).
setIsr(Arrays.asList(new Integer[]{0, 1})).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
setLeaderEpoch(0).
setPartitionEpoch(0);
if (version > 0) {
expectRecord.
setEligibleLeaderReplicas(Arrays.asList(new Integer[]{2, 3})).
setLastKnownELR(Arrays.asList(new Integer[]{4}));
}
assertEquals(new ApiMessageAndVersion(expectRecord, version), partitionRegistration.toRecord(topicID, 0, version));
assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas));
}
@Property
public void testConsistentEqualsAndHashCode(
@ForAll("uniqueSamples") PartitionRegistration a,
@ForAll("uniqueSamples") PartitionRegistration b
) {
if (a.equals(b)) {
assertEquals(a.hashCode(), b.hashCode());
assertEquals(a.hashCode(), b.hashCode(), "a=" + a + "\nb=" + b);
}
if (a.hashCode() != b.hashCode()) {
assertNotEquals(a, b);
assertNotEquals(a, b, "a=" + a + "\nb=" + b);
}
}
@ -251,24 +295,23 @@ public class PartitionRegistrationTest { @@ -251,24 +295,23 @@ public class PartitionRegistrationTest {
Arbitrary<PartitionRegistration> uniqueSamples() {
return Arbitraries.of(
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(101).setPartitionEpoch(200).build(),
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(101).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(201).build(),
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(201).setElr(new int[] {1, 2}).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING).setLeaderEpoch(100).setPartitionEpoch(200).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {4, 5, 6}).setAddingReplicas(new int[] {1, 2, 3}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 2, 3}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 3}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build()
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {2, 3}).setLastKnownElr(new int[] {1, 2}).build()
);
}
}

25
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

@ -186,7 +186,10 @@ public enum MetadataVersion { @@ -186,7 +186,10 @@ public enum MetadataVersion {
IBP_3_6_IV2(14, "3.6", "IV2", true),
// Implement KIP-919 controller registration.
IBP_3_7_IV0(15, "3.7", "IV0", true);
IBP_3_7_IV0(15, "3.7", "IV0", true),
// Add ELR related supports (KIP-966).
IBP_3_7_IV1(16, "3.7", "IV1", true);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version
@ -286,6 +289,10 @@ public enum MetadataVersion { @@ -286,6 +289,10 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_6_IV2);
}
public boolean isElrSupported() {
return this.isAtLeast(IBP_3_7_IV1);
}
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}
@ -336,6 +343,22 @@ public enum MetadataVersion { @@ -336,6 +343,22 @@ public enum MetadataVersion {
return this.isAtLeast(MetadataVersion.IBP_3_7_IV0);
}
public short partitionChangeRecordVersion() {
if (isElrSupported()) {
return (short) 1;
} else {
return (short) 0;
}
}
public short partitionRecordVersion() {
if (isElrSupported()) {
return (short) 1;
} else {
return (short) 0;
}
}
public short fetchRequestVersion() {
if (this.isAtLeast(IBP_3_5_IV1)) {
return 15;

18
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java

@ -164,6 +164,9 @@ class MetadataVersionTest { @@ -164,6 +164,9 @@ class MetadataVersionTest {
assertEquals(IBP_3_6_IV0, MetadataVersion.fromVersionString("3.6-IV0"));
assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1"));
assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2"));
assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0"));
assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1"));
}
@Test
@ -218,6 +221,8 @@ class MetadataVersionTest { @@ -218,6 +221,8 @@ class MetadataVersionTest {
assertEquals("3.6", IBP_3_6_IV0.shortVersion());
assertEquals("3.6", IBP_3_6_IV1.shortVersion());
assertEquals("3.6", IBP_3_6_IV2.shortVersion());
assertEquals("3.7", IBP_3_7_IV0.shortVersion());
assertEquals("3.7", IBP_3_7_IV1.shortVersion());
}
@Test
@ -261,6 +266,8 @@ class MetadataVersionTest { @@ -261,6 +266,8 @@ class MetadataVersionTest {
assertEquals("3.6-IV0", IBP_3_6_IV0.version());
assertEquals("3.6-IV1", IBP_3_6_IV1.version());
assertEquals("3.6-IV2", IBP_3_6_IV2.version());
assertEquals("3.7-IV0", IBP_3_7_IV0.version());
assertEquals("3.7-IV1", IBP_3_7_IV1.version());
}
@Test
@ -319,6 +326,17 @@ class MetadataVersionTest { @@ -319,6 +326,17 @@ class MetadataVersionTest {
metadataVersion.isDelegationTokenSupported());
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testIsElrSupported(MetadataVersion metadataVersion) {
assertEquals(metadataVersion.equals(IBP_3_7_IV1),
metadataVersion.isElrSupported());
short expectPartitionRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
assertEquals(expectPartitionRecordVersion, metadataVersion.partitionRecordVersion());
short expectPartitionChangeRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
assertEquals(expectPartitionChangeRecordVersion, metadataVersion.partitionChangeRecordVersion());
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {

6
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

@ -68,7 +68,7 @@ public class FeatureCommandTest { @@ -68,7 +68,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.7-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
"SupportedMaxVersion: 3.7-IV1\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
}
@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV0)
@ -78,7 +78,7 @@ public class FeatureCommandTest { @@ -78,7 +78,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.7-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput));
"SupportedMaxVersion: 3.7-IV1\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput));
}
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
@ -137,7 +137,7 @@ public class FeatureCommandTest { @@ -137,7 +137,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
"metadata.version. Local controller 3000 only supports versions 1-15", commandOutput);
"metadata.version. Local controller 3000 only supports versions 1-16", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),

Loading…
Cancel
Save