diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dd2c6c98608..dfbe0286322 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -307,6 +307,8 @@
files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
+
BrokerRole
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 7cd845da26e..3e09dc8fbe8 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testDefaults(ClusterConfig config) {
- Assertions.assertEquals(MetadataVersion.IBP_3_7_IV0, config.metadataVersion());
+ Assertions.assertEquals(MetadataVersion.IBP_3_7_IV1, config.metadataVersion());
}
}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index 886958533c2..150aa7e7d71 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -41,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
- MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV0;
+ MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV1;
ClusterConfigProperty[] serverProperties() default {};
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index a37276f2b9e..4ef66f1870a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -44,6 +44,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
@@ -434,6 +435,25 @@ public class ConfigurationControlManager {
}
}
+ /**
+ * Get the config value for the give topic and give config key.
+ * If the config value is not found, return null.
+ *
+ * @param topicName The topic name for the config.
+ * @param configKey The key for the config.
+ */
+ String getTopicConfig(String topicName, String configKey) throws NoSuchElementException {
+ Map map = configData.get(new ConfigResource(Type.TOPIC, topicName));
+ if (map == null || !map.containsKey(configKey)) {
+ Map effectiveConfigMap = computeEffectiveTopicConfigs(Collections.emptyMap());
+ if (!effectiveConfigMap.containsKey(configKey)) {
+ return null;
+ }
+ return effectiveConfigMap.get(configKey).value();
+ }
+ return map.get(configKey);
+ }
+
public Map>> describeConfigs(
long lastCommittedOffset, Map> resources) {
Map>> results = new HashMap<>();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 78d22967e2a..d8999638031 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -17,9 +17,12 @@
package org.apache.kafka.controller;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
@@ -44,6 +47,8 @@ public class PartitionChangeBuilder {
public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
if (record.isr() != null) return false;
+ if (record.eligibleLeaderReplicas() != null) return false;
+ if (record.lastKnownELR() != null) return false;
if (record.leader() != NO_LEADER_CHANGE) return false;
if (record.replicas() != null) return false;
if (record.removingReplicas() != null) return false;
@@ -79,9 +84,14 @@ public class PartitionChangeBuilder {
private List targetReplicas;
private List targetRemoving;
private List targetAdding;
+ private List targetElr;
+ private List targetLastKnownElr;
+ private List uncleanShutdownReplicas;
private Election election = Election.ONLINE;
private LeaderRecoveryState targetLeaderRecoveryState;
private boolean zkMigrationEnabled;
+ private boolean eligibleLeaderReplicasEnabled;
+ private int minISR;
public PartitionChangeBuilder(
@@ -89,7 +99,8 @@ public class PartitionChangeBuilder {
Uuid topicId,
int partitionId,
IntPredicate isAcceptableLeader,
- MetadataVersion metadataVersion
+ MetadataVersion metadataVersion,
+ int minISR
) {
this.partition = partition;
this.topicId = topicId;
@@ -97,11 +108,15 @@ public class PartitionChangeBuilder {
this.isAcceptableLeader = isAcceptableLeader;
this.metadataVersion = metadataVersion;
this.zkMigrationEnabled = false;
+ this.eligibleLeaderReplicasEnabled = false;
+ this.minISR = minISR;
this.targetIsr = Replicas.toList(partition.isr);
this.targetReplicas = Replicas.toList(partition.replicas);
this.targetRemoving = Replicas.toList(partition.removingReplicas);
this.targetAdding = Replicas.toList(partition.addingReplicas);
+ this.targetElr = Replicas.toList(partition.elr);
+ this.targetLastKnownElr = Replicas.toList(partition.lastKnownElr);
this.targetLeaderRecoveryState = partition.leaderRecoveryState;
}
@@ -124,6 +139,11 @@ public class PartitionChangeBuilder {
return this;
}
+ public PartitionChangeBuilder setUncleanShutdownReplicas(List uncleanShutdownReplicas) {
+ this.uncleanShutdownReplicas = uncleanShutdownReplicas;
+ return this;
+ }
+
public PartitionChangeBuilder setElection(Election election) {
this.election = election;
return this;
@@ -149,6 +169,11 @@ public class PartitionChangeBuilder {
return this;
}
+ public PartitionChangeBuilder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
+ this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
+ return this;
+ }
+
// VisibleForTesting
static class ElectionResult {
final int node;
@@ -328,15 +353,20 @@ public class PartitionChangeBuilder {
completeReassignmentIfNeeded();
+ maybePopulateTargetElr();
+
tryElection(record);
triggerLeaderEpochBumpIfNeeded(record);
+ maybeUpdateRecordElr(record);
+
if (record.isr() == null && !targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) {
// Set the new ISR if it is different from the current ISR and unclean leader election didn't already set it.
record.setIsr(targetIsr);
}
+
setAssignmentChanges(record);
if (targetLeaderRecoveryState != partition.leaderRecoveryState) {
@@ -346,7 +376,7 @@ public class PartitionChangeBuilder {
if (changeRecordIsNoOp(record)) {
return Optional.empty();
} else {
- return Optional.of(new ApiMessageAndVersion(record, (short) 0));
+ return Optional.of(new ApiMessageAndVersion(record, metadataVersion.partitionChangeRecordVersion()));
}
}
@@ -362,6 +392,58 @@ public class PartitionChangeBuilder {
}
}
+ private void maybeUpdateRecordElr(PartitionChangeRecord record) {
+ // During the leader election, it can set the record isr if an unclean leader election happens.
+ boolean isCleanLeaderElection = record.isr() == null;
+
+ // Clean the ELR related fields if it is an unclean election or ELR is disabled.
+ if (!isCleanLeaderElection || !eligibleLeaderReplicasEnabled) {
+ targetElr = Collections.emptyList();
+ targetLastKnownElr = Collections.emptyList();
+ }
+
+ if (!targetElr.equals(Replicas.toList(partition.elr))) {
+ record.setEligibleLeaderReplicas(targetElr);
+ }
+
+ if (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+ record.setLastKnownELR(targetLastKnownElr);
+ }
+ }
+
+ private void maybePopulateTargetElr() {
+ if (!eligibleLeaderReplicasEnabled) return;
+
+ // If the ISR is larger or equal to the min ISR, clear the ELR and lastKnownELR.
+ if (targetIsr.size() >= minISR) {
+ targetElr = Collections.emptyList();
+ targetLastKnownElr = Collections.emptyList();
+ return;
+ }
+
+ Set targetIsrSet = new HashSet<>(targetIsr);
+ // Tracking the ELR. The new elr is expected to
+ // 1. Include the current ISR
+ // 2. Exclude the duplicate replicas between elr and target ISR.
+ // 3. Exclude unclean shutdown replicas.
+ // To do that, we first union the current ISR and current elr, then filter out the target ISR and unclean shutdown
+ // Replicas.
+ Set candidateSet = new HashSet<>(targetElr);
+ Arrays.stream(partition.isr).forEach(ii -> candidateSet.add(ii));
+ targetElr = candidateSet.stream()
+ .filter(replica -> !targetIsrSet.contains(replica))
+ .filter(replica -> uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica))
+ .collect(Collectors.toList());
+
+ // Calculate the new last known ELR. Includes any ISR members since the ISR size drops below min ISR.
+ // In order to reduce the metadata usage, the last known ELR excludes the members in ELR and current ISR.
+ targetLastKnownElr.forEach(ii -> candidateSet.add(ii));
+ targetLastKnownElr = candidateSet.stream()
+ .filter(replica -> !targetIsrSet.contains(replica))
+ .filter(replica -> !targetElr.contains(replica))
+ .collect(Collectors.toList());
+ }
+
@Override
public String toString() {
return "PartitionChangeBuilder(" +
@@ -373,6 +455,9 @@ public class PartitionChangeBuilder {
", targetReplicas=" + targetReplicas +
", targetRemoving=" + targetRemoving +
", targetAdding=" + targetAdding +
+ ", targetElr=" + targetElr +
+ ", targetLastKnownElr=" + targetLastKnownElr +
+ ", uncleanShutdownReplicas=" + uncleanShutdownReplicas +
", election=" + election +
", targetLeaderRecoveryState=" + targetLeaderRecoveryState +
')';
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 6e111c55138..3422b3d406e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -203,6 +203,7 @@ public final class QuorumController implements Controller {
private QuorumFeatures quorumFeatures = null;
private short defaultReplicationFactor = 3;
private int defaultNumPartitions = 1;
+ private int defaultMinIsr = 1;
private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
@@ -215,6 +216,7 @@ public final class QuorumController implements Controller {
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
private boolean zkMigrationEnabled = false;
+ private boolean eligibleLeaderReplicasEnabled = false;
private DelegationTokenCache tokenCache;
private String tokenSecretKeyString;
private long delegationTokenMaxLifeMs;
@@ -280,6 +282,11 @@ public final class QuorumController implements Controller {
return this;
}
+ public Builder setDefaultMinIsr(int defaultMinIsr) {
+ this.defaultMinIsr = defaultMinIsr;
+ return this;
+ }
+
public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
this.replicaPlacer = replicaPlacer;
return this;
@@ -340,6 +347,11 @@ public final class QuorumController implements Controller {
return this;
}
+ public Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
+ this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
+ return this;
+ }
+
public Builder setDelegationTokenCache(DelegationTokenCache tokenCache) {
this.tokenCache = tokenCache;
return this;
@@ -405,6 +417,7 @@ public final class QuorumController implements Controller {
quorumFeatures,
defaultReplicationFactor,
defaultNumPartitions,
+ defaultMinIsr,
replicaPlacer,
leaderImbalanceCheckIntervalNs,
maxIdleIntervalNs,
@@ -421,7 +434,8 @@ public final class QuorumController implements Controller {
tokenSecretKeyString,
delegationTokenMaxLifeMs,
delegationTokenExpiryTimeMs,
- delegationTokenExpiryCheckIntervalMs
+ delegationTokenExpiryCheckIntervalMs,
+ eligibleLeaderReplicasEnabled
);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
@@ -1746,6 +1760,8 @@ public final class QuorumController implements Controller {
private final boolean zkMigrationEnabled;
+ private final boolean eligibleLeaderReplicasEnabled;
+
/**
* The maximum number of records per batch to allow.
*/
@@ -1769,6 +1785,7 @@ public final class QuorumController implements Controller {
QuorumFeatures quorumFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
+ int defaultMinIsr,
ReplicaPlacer replicaPlacer,
OptionalLong leaderImbalanceCheckIntervalNs,
OptionalLong maxIdleIntervalNs,
@@ -1785,7 +1802,8 @@ public final class QuorumController implements Controller {
String tokenSecretKeyString,
long delegationTokenMaxLifeMs,
long delegationTokenExpiryTimeMs,
- long delegationTokenExpiryCheckIntervalMs
+ long delegationTokenExpiryCheckIntervalMs,
+ boolean eligibleLeaderReplicasEnabled
) {
this.nonFatalFaultHandler = nonFatalFaultHandler;
this.fatalFaultHandler = fatalFaultHandler;
@@ -1854,6 +1872,8 @@ public final class QuorumController implements Controller {
setLogContext(logContext).
setDefaultReplicationFactor(defaultReplicationFactor).
setDefaultNumPartitions(defaultNumPartitions).
+ setDefaultMinIsr(defaultMinIsr).
+ setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
@@ -1887,9 +1907,11 @@ public final class QuorumController implements Controller {
this.zkRecordConsumer = new MigrationRecordConsumer();
this.zkMigrationEnabled = zkMigrationEnabled;
this.recordRedactor = new RecordRedactor(configSchema);
+ this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
- log.info("Creating new QuorumController with clusterId {}.{}",
- clusterId, zkMigrationEnabled ? " ZK migration mode is enabled." : "");
+ log.info("Creating new QuorumController with clusterId {}.{}{}",
+ clusterId, zkMigrationEnabled ? " ZK migration mode is enabled." : "",
+ eligibleLeaderReplicasEnabled ? " Eligible leader replicas enabled." : "");
this.raftClient.register(metaLogListener);
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 7c6b6717773..e923e671dbb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -119,6 +119,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INELIGIBLE_REPLICA;
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
@@ -150,11 +151,14 @@ public class ReplicationControlManager {
private LogContext logContext = null;
private short defaultReplicationFactor = (short) 3;
private int defaultNumPartitions = 1;
+
+ private int defaultMinIsr = 1;
private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
private ConfigurationControlManager configurationControl = null;
private ClusterControlManager clusterControl = null;
private Optional createTopicPolicy = Optional.empty();
private FeatureControlManager featureControl = null;
+ private boolean eligibleLeaderReplicasEnabled = false;
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
@@ -176,6 +180,16 @@ public class ReplicationControlManager {
return this;
}
+ Builder setDefaultMinIsr(int defaultMinIsr) {
+ this.defaultMinIsr = defaultMinIsr;
+ return this;
+ }
+
+ Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
+ this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
+ return this;
+ }
+
Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
return this;
@@ -216,7 +230,9 @@ public class ReplicationControlManager {
logContext,
defaultReplicationFactor,
defaultNumPartitions,
+ defaultMinIsr,
maxElectionsPerImbalance,
+ eligibleLeaderReplicasEnabled,
configurationControl,
clusterControl,
createTopicPolicy,
@@ -279,6 +295,16 @@ public class ReplicationControlManager {
*/
private final int defaultNumPartitions;
+ /**
+ * The default min ISR that is used if a CreateTopics request does not specify one.
+ */
+ private final int defaultMinIsr;
+
+ /**
+ * True if eligible leader replicas is enabled.
+ */
+ private final boolean eligibleLeaderReplicasEnabled;
+
/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
@@ -356,7 +382,9 @@ public class ReplicationControlManager {
LogContext logContext,
short defaultReplicationFactor,
int defaultNumPartitions,
+ int defaultMinIsr,
int maxElectionsPerImbalance,
+ boolean eligibleLeaderReplicasEnabled,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
Optional createTopicPolicy,
@@ -366,7 +394,9 @@ public class ReplicationControlManager {
this.log = logContext.logger(ReplicationControlManager.class);
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
+ this.defaultMinIsr = defaultMinIsr;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
+ this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.configurationControl = configurationControl;
this.createTopicPolicy = createTopicPolicy;
this.featureControl = featureControl;
@@ -761,7 +791,7 @@ public class ReplicationControlManager {
for (Entry partEntry : newParts.entrySet()) {
int partitionIndex = partEntry.getKey();
PartitionRegistration info = partEntry.getValue();
- records.add(info.toRecord(topicId, partitionIndex));
+ records.add(info.toRecord(topicId, partitionIndex, featureControl.metadataVersion().partitionRecordVersion()));
}
return ApiError.NONE;
}
@@ -947,6 +977,10 @@ public class ReplicationControlManager {
return new HashSet<>(imbalancedPartitions);
}
+ boolean isElrEnabled() {
+ return eligibleLeaderReplicasEnabled && featureControl.metadataVersion().isElrSupported();
+ }
+
ControllerResult alterPartition(
ControllerRequestContext context,
AlterPartitionRequestData request
@@ -1009,9 +1043,11 @@ public class ReplicationControlManager {
topic.id,
partitionId,
clusterControl::isActive,
- featureControl.metadataVersion()
+ featureControl.metadataVersion(),
+ getTopicEffectiveMinIsr(topic.name)
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+ builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1397,9 +1433,12 @@ public class ReplicationControlManager {
topicId,
partitionId,
clusterControl::isActive,
- featureControl.metadataVersion()
+ featureControl.metadataVersion(),
+ getTopicEffectiveMinIsr(topic)
);
- builder.setElection(election).setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+ builder.setElection(election)
+ .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+ builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
Optional record = builder.build();
if (!record.isPresent()) {
if (electionType == ElectionType.PREFERRED) {
@@ -1532,10 +1571,12 @@ public class ReplicationControlManager {
topicPartition.topicId(),
topicPartition.partitionId(),
clusterControl::isActive,
- featureControl.metadataVersion()
+ featureControl.metadataVersion(),
+ getTopicEffectiveMinIsr(topic.name)
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+ builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
builder.build().ifPresent(records::add);
}
@@ -1655,17 +1696,8 @@ public class ReplicationControlManager {
"Unable to replicate the partition " + replicationFactor +
" time(s): All brokers are currently fenced or in controlled shutdown.");
}
- records.add(new ApiMessageAndVersion(new PartitionRecord().
- setPartitionId(partitionId).
- setTopicId(topicId).
- setReplicas(replicas).
- setIsr(isr).
- setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
- setRemovingReplicas(Collections.emptyList()).
- setAddingReplicas(Collections.emptyList()).
- setLeader(isr.get(0)).
- setLeaderEpoch(0).
- setPartitionEpoch(0), (short) 0));
+ records.add(buildPartitionRegistration(replicas, isr)
+ .toRecord(topicId, partitionId, featureControl.metadataVersion().partitionRecordVersion()));
partitionId++;
}
}
@@ -1754,9 +1786,11 @@ public class ReplicationControlManager {
topicIdPart.topicId(),
topicIdPart.partitionId(),
isAcceptableLeader,
- featureControl.metadataVersion()
+ featureControl.metadataVersion(),
+ getTopicEffectiveMinIsr(topic.name)
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+ builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1867,9 +1901,11 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
- featureControl.metadataVersion()
+ featureControl.metadataVersion(),
+ getTopicEffectiveMinIsr(topicName)
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+ builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1925,9 +1961,11 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
- featureControl.metadataVersion()
+ featureControl.metadataVersion(),
+ getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString())
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+ builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
@@ -1995,6 +2033,26 @@ public class ReplicationControlManager {
setReplicas(Replicas.toList(partition.replicas)));
}
+ // Visible to test.
+ int getTopicEffectiveMinIsr(String topicName) {
+ int currentMinIsr = defaultMinIsr;
+ String minIsrConfig = configurationControl.getTopicConfig(topicName, MIN_IN_SYNC_REPLICAS_CONFIG);
+ if (minIsrConfig != null) {
+ currentMinIsr = Integer.parseInt(minIsrConfig);
+ } else {
+ log.warn("Can't find the min isr config for topic: " + topicName + " using default value " + defaultMinIsr);
+ }
+
+ int replicationFactor = defaultReplicationFactor;
+ try {
+ Uuid topicId = topicsByName.get(topicName);
+ replicationFactor = topics.get(topicId).parts.get(0).replicas.length;
+ } catch (Exception e) {
+ log.warn("Can't find the replication factor for topic: " + topicName + " using default value " + replicationFactor + ". Error=" + e);
+ }
+ return Math.min(currentMinIsr, replicationFactor);
+ }
+
private static final class IneligibleReplica {
private final int replicaId;
private final String reason;
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicImage.java
index 743e37e8236..f617b8fb082 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicImage.java
@@ -68,7 +68,7 @@ public final class TopicImage {
for (Entry entry : partitions.entrySet()) {
int partitionId = entry.getKey();
PartitionRegistration partition = entry.getValue();
- writer.write(partition.toRecord(id, partitionId));
+ writer.write(partition.toRecord(id, partitionId, options.metadataVersion().partitionRecordVersion()));
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 39f47ee43e5..65d0652cd75 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -42,6 +42,8 @@ public class PartitionRegistration {
private int[] isr;
private int[] removingReplicas = Replicas.NONE;
private int[] addingReplicas = Replicas.NONE;
+ private int[] elr = Replicas.NONE;
+ private int[] lastKnownElr = Replicas.NONE;
private Integer leader;
private LeaderRecoveryState leaderRecoveryState;
private Integer leaderEpoch;
@@ -67,6 +69,16 @@ public class PartitionRegistration {
return this;
}
+ public Builder setElr(int[] elr) {
+ this.elr = elr;
+ return this;
+ }
+
+ public Builder setLastKnownElr(int[] lastKnownElr) {
+ this.lastKnownElr = lastKnownElr;
+ return this;
+ }
+
public Builder setLeader(Integer leader) {
this.leader = leader;
return this;
@@ -104,6 +116,10 @@ public class PartitionRegistration {
throw new IllegalStateException("You must set leader epoch.");
} else if (partitionEpoch == null) {
throw new IllegalStateException("You must set partition epoch.");
+ } else if (elr == null) {
+ throw new IllegalStateException("You must set ELR.");
+ } else if (lastKnownElr == null) {
+ throw new IllegalStateException("You must set last known elr.");
}
return new PartitionRegistration(
@@ -114,7 +130,9 @@ public class PartitionRegistration {
leader,
leaderRecoveryState,
leaderEpoch,
- partitionEpoch
+ partitionEpoch,
+ elr,
+ lastKnownElr
);
}
}
@@ -123,6 +141,8 @@ public class PartitionRegistration {
public final int[] isr;
public final int[] removingReplicas;
public final int[] addingReplicas;
+ public final int[] elr;
+ public final int[] lastKnownElr;
public final int leader;
public final LeaderRecoveryState leaderRecoveryState;
public final int leaderEpoch;
@@ -140,12 +160,14 @@ public class PartitionRegistration {
record.leader(),
LeaderRecoveryState.of(record.leaderRecoveryState()),
record.leaderEpoch(),
- record.partitionEpoch());
+ record.partitionEpoch(),
+ Replicas.toArray(record.eligibleLeaderReplicas()),
+ Replicas.toArray(record.lastKnownELR()));
}
private PartitionRegistration(int[] replicas, int[] isr, int[] removingReplicas,
int[] addingReplicas, int leader, LeaderRecoveryState leaderRecoveryState,
- int leaderEpoch, int partitionEpoch) {
+ int leaderEpoch, int partitionEpoch, int[] elr, int[] lastKnownElr) {
this.replicas = replicas;
this.isr = isr;
this.removingReplicas = removingReplicas;
@@ -154,6 +176,10 @@ public class PartitionRegistration {
this.leaderRecoveryState = leaderRecoveryState;
this.leaderEpoch = leaderEpoch;
this.partitionEpoch = partitionEpoch;
+
+ // We could parse a lower version record without elr/lastKnownElr.
+ this.elr = elr == null ? new int[0] : elr;
+ this.lastKnownElr = lastKnownElr == null ? new int[0] : lastKnownElr;
}
public PartitionRegistration merge(PartitionChangeRecord record) {
@@ -177,6 +203,8 @@ public class PartitionRegistration {
LeaderRecoveryState newLeaderRecoveryState = leaderRecoveryState.changeTo(record.leaderRecoveryState());
+ int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas());
+ int[] newLastKnownElr = (record.lastKnownELR() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownELR());
return new PartitionRegistration(newReplicas,
newIsr,
newRemovingReplicas,
@@ -184,7 +212,9 @@ public class PartitionRegistration {
newLeader,
newLeaderRecoveryState,
newLeaderEpoch,
- partitionEpoch + 1);
+ partitionEpoch + 1,
+ newElr,
+ newLastKnownElr);
}
public String diff(PartitionRegistration prev) {
@@ -229,6 +259,18 @@ public class PartitionRegistration {
append(prev.leaderEpoch).append(" -> ").append(leaderEpoch);
prefix = ", ";
}
+ if (!Arrays.equals(elr, prev.elr)) {
+ builder.append(prefix).append("elr: ").
+ append(Arrays.toString(prev.elr)).
+ append(" -> ").append(Arrays.toString(elr));
+ prefix = ", ";
+ }
+ if (!Arrays.equals(lastKnownElr, prev.lastKnownElr)) {
+ builder.append(prefix).append("lastKnownElr: ").
+ append(Arrays.toString(prev.lastKnownElr)).
+ append(" -> ").append(Arrays.toString(lastKnownElr));
+ prefix = ", ";
+ }
if (partitionEpoch != prev.partitionEpoch) {
builder.append(prefix).append("partitionEpoch: ").
append(prev.partitionEpoch).append(" -> ").append(partitionEpoch);
@@ -256,8 +298,8 @@ public class PartitionRegistration {
return replicas.length == 0 ? LeaderConstants.NO_LEADER : replicas[0];
}
- public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId) {
- return new ApiMessageAndVersion(new PartitionRecord().
+ public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, short version) {
+ PartitionRecord record = new PartitionRecord().
setPartitionId(partitionId).
setTopicId(topicId).
setReplicas(Replicas.toList(replicas)).
@@ -267,7 +309,12 @@ public class PartitionRegistration {
setLeader(leader).
setLeaderRecoveryState(leaderRecoveryState.value()).
setLeaderEpoch(leaderEpoch).
- setPartitionEpoch(partitionEpoch), (short) 0);
+ setPartitionEpoch(partitionEpoch);
+ if (version > 0) {
+ record.setEligibleLeaderReplicas(Replicas.toList(elr)).
+ setLastKnownELR(Replicas.toList(lastKnownElr));
+ }
+ return new ApiMessageAndVersion(record, version);
}
public LeaderAndIsrPartitionState toLeaderAndIsrPartitionState(TopicPartition tp,
@@ -290,6 +337,7 @@ public class PartitionRegistration {
@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(replicas), Arrays.hashCode(isr), Arrays.hashCode(removingReplicas),
+ Arrays.hashCode(elr), Arrays.hashCode(lastKnownElr),
Arrays.hashCode(addingReplicas), leader, leaderRecoveryState, leaderEpoch, partitionEpoch);
}
@@ -301,6 +349,8 @@ public class PartitionRegistration {
Arrays.equals(isr, other.isr) &&
Arrays.equals(removingReplicas, other.removingReplicas) &&
Arrays.equals(addingReplicas, other.addingReplicas) &&
+ Arrays.equals(elr, other.elr) &&
+ Arrays.equals(lastKnownElr, other.lastKnownElr) &&
leader == other.leader &&
leaderRecoveryState == other.leaderRecoveryState &&
leaderEpoch == other.leaderEpoch &&
@@ -314,6 +364,8 @@ public class PartitionRegistration {
builder.append(", isr=").append(Arrays.toString(isr));
builder.append(", removingReplicas=").append(Arrays.toString(removingReplicas));
builder.append(", addingReplicas=").append(Arrays.toString(addingReplicas));
+ builder.append(", elr=").append(Arrays.toString(elr));
+ builder.append(", lastKnownElr=").append(Arrays.toString(lastKnownElr));
builder.append(", leader=").append(leader);
builder.append(", leaderRecoveryState=").append(leaderRecoveryState);
builder.append(", leaderEpoch=").append(leaderEpoch);
diff --git a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
index 587e512d575..9b58269b35b 100644
--- a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
@@ -17,7 +17,9 @@
"apiKey": 5,
"type": "metadata",
"name": "PartitionChangeRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
+ // Version 1 implements Eligiable Leader Replicas and LastKnownELR as described in KIP-966.
+ //
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
@@ -40,6 +42,12 @@
"versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 4,
"about": "null if the adding replicas didn't change; the new adding replicas otherwise." },
{ "name": "LeaderRecoveryState", "type": "int8", "default": "-1", "versions": "0+", "taggedVersions": "0+", "tag": 5,
- "about": "-1 if it didn't change; 0 if the leader was elected from the ISR or recovered from an unclean election; 1 if the leader that was elected using unclean leader election and it is still recovering." }
+ "about": "-1 if it didn't change; 0 if the leader was elected from the ISR or recovered from an unclean election; 1 if the leader that was elected using unclean leader election and it is still recovering." },
+ { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+ "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 6,
+ "about": "null if the ELR didn't change; the new eligible leader replicas otherwise." },
+ { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
+ "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 7,
+ "about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
]
}
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index fdd05f8a5ca..e65070968ed 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -17,7 +17,9 @@
"apiKey": 3,
"type": "metadata",
"name": "PartitionRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
+ // Version 1 implements Eligiable Leader Replicas and LastKnownELR as described in KIP-966.
+ //
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
@@ -39,6 +41,12 @@
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "The epoch of the partition leader." },
{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
- "about": "An epoch that gets incremented each time we change anything in the partition." }
+ "about": "An epoch that gets incremented each time we change anything in the partition." },
+ { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId",
+ "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 1,
+ "about": "The eligible leader replicas of this partition." },
+ { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
+ "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2,
+ "about": "The last known eligible leader replicas of this partition." }
]
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 175202a8d63..5e76114d2e9 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -57,6 +57,7 @@ import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -136,6 +137,8 @@ public class ConfigurationControlManagerTest {
setName("def").setValue("blah"));
assertEquals(toMap(entry("abc", "x,y,z"), entry("def", "blah")),
manager.getConfigs(MYTOPIC));
+ assertEquals("x,y,z", manager.getTopicConfig(MYTOPIC.name(), "abc"));
+ assertTrue(manager.getTopicConfig(MYTOPIC.name(), "none-exists") == null);
}
@Test
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index c2b07956d86..019275adcc2 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -38,9 +38,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.function.IntPredicate;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
-import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.controller.PartitionChangeBuilder.Election;
import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@@ -53,13 +53,17 @@ import static org.junit.jupiter.params.provider.Arguments.arguments;
@Timeout(value = 40)
public class PartitionChangeBuilderTest {
+ private static Stream partitionChangeRecordVersions() {
+ return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version));
+ }
+
@Test
public void testChangeRecordIsNoOp() {
/* If the next few checks fail please update them based on the latest schema and make sure
* to update changeRecordIsNoOp to take into account the new schema or tagged fields.
*/
// Check that the supported versions haven't changed
- assertEquals(0, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION);
+ assertEquals(1, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION);
assertEquals(0, PartitionChangeRecord.LOWEST_SUPPORTED_VERSION);
// For the latest version check that the number of tagged fields hasn't changed
TaggedFields taggedFields = (TaggedFields) PartitionChangeRecord.SCHEMA_0.get(2).def.type;
@@ -73,6 +77,10 @@ public class PartitionChangeBuilderTest {
setRemovingReplicas(Arrays.asList(1))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setAddingReplicas(Arrays.asList(4))));
+ assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+ setEligibleLeaderReplicas(Arrays.asList(5))));
+ assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+ setLastKnownELR(Arrays.asList(6))));
assertFalse(
changeRecordIsNoOp(
new PartitionChangeRecord()
@@ -92,12 +100,16 @@ public class PartitionChangeBuilderTest {
private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
- private static PartitionChangeBuilder createFooBuilder() {
- return createFooBuilder(MetadataVersion.latest());
+ private static MetadataVersion metadataVersionForPartitionChangeRecordVersion(short version) {
+ return isElrEnabled(version) ? MetadataVersion.IBP_3_7_IV1 : MetadataVersion.IBP_3_7_IV0;
}
private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataVersion) {
- return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersion);
+ return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersion, 2);
+ }
+
+ private static PartitionChangeBuilder createFooBuilder(short version) {
+ return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
@@ -113,8 +125,12 @@ public class PartitionChangeBuilderTest {
private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
- private static PartitionChangeBuilder createBarBuilder() {
- return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, MetadataVersion.latest());
+ private static boolean isElrEnabled(short partitionChangeRecordVersion) {
+ return partitionChangeRecordVersion > 0;
+ }
+
+ private static PartitionChangeBuilder createBarBuilder(short version) {
+ return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static final PartitionRegistration BAZ = new PartitionRegistration.Builder().
@@ -128,8 +144,8 @@ public class PartitionChangeBuilderTest {
private final static Uuid BAZ_ID = Uuid.fromString("wQzt5gkSTwuQNXZF5gIw7A");
- private static PartitionChangeBuilder createBazBuilder() {
- return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true, MetadataVersion.latest());
+ private static PartitionChangeBuilder createBazBuilder(short version) {
+ return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static final PartitionRegistration OFFLINE = new PartitionRegistration.Builder().
@@ -143,8 +159,8 @@ public class PartitionChangeBuilderTest {
private final static Uuid OFFLINE_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
- private static PartitionChangeBuilder createOfflineBuilder() {
- return new PartitionChangeBuilder(OFFLINE, OFFLINE_ID, 0, r -> r == 1, MetadataVersion.latest());
+ private static PartitionChangeBuilder createOfflineBuilder(short version) {
+ return new PartitionChangeBuilder(OFFLINE, OFFLINE_ID, 0, r -> r == 1, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static void assertElectLeaderEquals(PartitionChangeBuilder builder,
@@ -155,29 +171,30 @@ public class PartitionChangeBuilderTest {
assertEquals(expectedUnclean, electionResult.unclean);
}
- @Test
- public void testElectLeader() {
- assertElectLeaderEquals(createFooBuilder().setElection(Election.PREFERRED), 2, false);
- assertElectLeaderEquals(createFooBuilder(), 1, false);
- assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN), 1, false);
- assertElectLeaderEquals(createFooBuilder()
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testElectLeader(short version) {
+ assertElectLeaderEquals(createFooBuilder(version).setElection(Election.PREFERRED), 2, false);
+ assertElectLeaderEquals(createFooBuilder(version), 1, false);
+ assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN), 1, false);
+ assertElectLeaderEquals(createFooBuilder(version)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 3))), 1, false);
- assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN)
+ assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 3))), 1, false);
- assertElectLeaderEquals(createFooBuilder()
+ assertElectLeaderEquals(createFooBuilder(version)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))), NO_LEADER, false);
- assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN).
+ assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN).
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))), 2, true);
assertElectLeaderEquals(
- createFooBuilder().setElection(Election.UNCLEAN)
+ createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(4))).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
4,
false
);
- assertElectLeaderEquals(createBazBuilder().setElection(Election.PREFERRED), 3, false);
- assertElectLeaderEquals(createBazBuilder(), 3, false);
- assertElectLeaderEquals(createBazBuilder().setElection(Election.UNCLEAN), 3, false);
+ assertElectLeaderEquals(createBazBuilder(version).setElection(Election.PREFERRED), 3, false);
+ assertElectLeaderEquals(createBazBuilder(version), 3, false);
+ assertElectLeaderEquals(createBazBuilder(version).setElection(Election.UNCLEAN), 3, false);
}
private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
@@ -187,13 +204,14 @@ public class PartitionChangeBuilderTest {
assertEquals(expectedLeader, record.leader());
}
- @Test
- public void testTriggerLeaderEpochBumpIfNeeded() {
- testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(),
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testTriggerLeaderEpochBumpIfNeeded(short version) {
+ testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version),
new PartitionChangeRecord(), NO_LEADER_CHANGE);
// Shrinking the ISR doesn't increase the leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder().setTargetIsrWithBrokerStates(
+ createFooBuilder(version).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
),
new PartitionChangeRecord(),
@@ -201,7 +219,7 @@ public class PartitionChangeBuilderTest {
);
// Expanding the ISR doesn't increase the leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder().setTargetIsrWithBrokerStates(
+ createFooBuilder(version).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))
),
new PartitionChangeRecord(),
@@ -209,24 +227,24 @@ public class PartitionChangeBuilderTest {
);
// Expanding the ISR during migration doesn't increase leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder()
+ createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4)))
.setZkMigrationEnabled(true),
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
- testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
+ testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
NO_LEADER_CHANGE);
- testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
+ testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
new PartitionChangeRecord().setLeader(2), 2);
// Check that the leader epoch is bump if the ISR shrinks and isSkipLeaderEpochBumpSupported is not supported.
// See KAFKA-15021 for details.
testTriggerLeaderEpochBumpIfNeededLeader(
- new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.IBP_3_5_IV2)
+ new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.IBP_3_5_IV2, 2)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
),
@@ -235,11 +253,12 @@ public class PartitionChangeBuilderTest {
);
}
- @Test
- public void testLeaderEpochBumpZkMigration() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testLeaderEpochBumpZkMigration(short version) {
// KAFKA-15109: Shrinking the ISR while in ZK migration mode requires a leader epoch bump
testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder()
+ createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
.setZkMigrationEnabled(true),
@@ -248,7 +267,7 @@ public class PartitionChangeBuilderTest {
);
testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder()
+ createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
.setZkMigrationEnabled(false),
@@ -276,17 +295,19 @@ public class PartitionChangeBuilderTest {
);
}
- @Test
- public void testNoChange() {
- assertEquals(Optional.empty(), createFooBuilder().build());
- assertEquals(Optional.empty(), createFooBuilder().setElection(Election.UNCLEAN).build());
- assertEquals(Optional.empty(), createBarBuilder().build());
- assertEquals(Optional.empty(), createBarBuilder().setElection(Election.UNCLEAN).build());
- assertEquals(Optional.empty(), createBazBuilder().setElection(Election.PREFERRED).build());
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testNoChange(short version) {
+ assertEquals(Optional.empty(), createFooBuilder(version).build());
+ assertEquals(Optional.empty(), createFooBuilder(version).setElection(Election.UNCLEAN).build());
+ assertEquals(Optional.empty(), createBarBuilder(version).build());
+ assertEquals(Optional.empty(), createBarBuilder(version).setElection(Election.UNCLEAN).build());
+ assertEquals(Optional.empty(), createBazBuilder(version).setElection(Election.PREFERRED).build());
}
- @Test
- public void testIsrChangeDoesntBumpLeaderEpoch() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testIsrChangeDoesntBumpLeaderEpoch(short version) {
// Changing the ISR should not cause the leader epoch to increase
assertEquals(
// Expected
@@ -296,11 +317,11 @@ public class PartitionChangeBuilderTest {
.setTopicId(FOO_ID)
.setPartitionId(0)
.setIsr(Arrays.asList(2, 1)),
- PARTITION_CHANGE_RECORD.highestSupportedVersion()
+ version
)
),
// Actual
- createFooBuilder()
+ createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
)
@@ -308,28 +329,31 @@ public class PartitionChangeBuilderTest {
);
}
- @Test
- public void testIsrChangeAndLeaderChange() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testIsrChangeAndLeaderChange(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setIsr(Arrays.asList(2, 3)).
- setLeader(2), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createFooBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 3))).build());
+ setLeader(2), version)),
+ createFooBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 3))).build());
}
- @Test
- public void testReassignmentRearrangesReplicas() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testReassignmentRearrangesReplicas(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(3, 2, 1)),
- PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createFooBuilder().setTargetReplicas(Arrays.asList(3, 2, 1)).build());
+ version)),
+ createFooBuilder(version).setTargetReplicas(Arrays.asList(3, 2, 1)).build());
}
- @Test
- public void testIsrEnlargementCompletesReassignment() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testIsrEnlargementCompletesReassignment(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(BAR_ID).
setPartitionId(0).
@@ -338,12 +362,13 @@ public class PartitionChangeBuilderTest {
setLeader(2).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
- PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createBarBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3, 4))).build());
+ version)),
+ createBarBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3, 4))).build());
}
- @Test
- public void testRevertReassignment() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testRevertReassignment(short version) {
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR);
assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
assertEquals(Arrays.asList(1, 2, 3), revert.isr());
@@ -354,8 +379,8 @@ public class PartitionChangeBuilderTest {
setLeader(1).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
- PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createBarBuilder().
+ version)),
+ createBarBuilder(version).
setTargetReplicas(revert.replicas()).
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(revert.isr())).
setTargetRemoving(Collections.emptyList()).
@@ -363,8 +388,9 @@ public class PartitionChangeBuilderTest {
build());
}
- @Test
- public void testRemovingReplicaReassignment() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testRemovingReplicaReassignment(short version) {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2)));
assertEquals(Collections.singletonList(3), replicas.removing());
@@ -376,15 +402,16 @@ public class PartitionChangeBuilderTest {
setReplicas(Arrays.asList(1, 2)).
setIsr(Arrays.asList(2, 1)).
setLeader(1),
- PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createFooBuilder().
+ version)),
+ createFooBuilder(version).
setTargetReplicas(replicas.replicas()).
setTargetRemoving(replicas.removing()).
build());
}
- @Test
- public void testAddingReplicaReassignment() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testAddingReplicaReassignment(short version) {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2, 3, 4)));
assertEquals(Collections.emptyList(), replicas.removing());
@@ -395,15 +422,16 @@ public class PartitionChangeBuilderTest {
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3, 4)).
setAddingReplicas(Collections.singletonList(4)),
- PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createFooBuilder().
+ version)),
+ createFooBuilder(version).
setTargetReplicas(replicas.replicas()).
setTargetAdding(replicas.adding()).
build());
}
- @Test
- public void testUncleanLeaderElection() {
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testUncleanLeaderElection(short version) {
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(FOO_ID)
@@ -411,11 +439,11 @@ public class PartitionChangeBuilderTest {
.setIsr(Arrays.asList(2))
.setLeader(2)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()),
- PARTITION_CHANGE_RECORD.highestSupportedVersion()
+ version
);
assertEquals(
Optional.of(expectedRecord),
- createFooBuilder().setElection(Election.UNCLEAN)
+ createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))).build()
);
@@ -426,15 +454,16 @@ public class PartitionChangeBuilderTest {
.setIsr(Arrays.asList(1))
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()),
- PARTITION_CHANGE_RECORD.highestSupportedVersion()
+ version
);
assertEquals(
Optional.of(expectedRecord),
- createOfflineBuilder().setElection(Election.UNCLEAN).build()
+ createOfflineBuilder(version).setElection(Election.UNCLEAN).build()
);
+
assertEquals(
Optional.of(expectedRecord),
- createOfflineBuilder().setElection(Election.UNCLEAN)
+ createOfflineBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2))).build()
);
}
@@ -471,7 +500,8 @@ public class PartitionChangeBuilderTest {
FOO_ID,
0,
brokerId -> false,
- metadataVersion
+ metadataVersion,
+ 2
);
offlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// Set the target ISR to empty to indicate that the last leader is offline
@@ -497,7 +527,8 @@ public class PartitionChangeBuilderTest {
FOO_ID,
0,
brokerId -> true,
- metadataVersion
+ metadataVersion,
+ 2
);
onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
@@ -534,7 +565,8 @@ public class PartitionChangeBuilderTest {
FOO_ID,
0,
brokerId -> brokerId == leaderId,
- metadataVersion
+ metadataVersion,
+ 2
).setElection(Election.UNCLEAN);
onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// The partition should stay as recovering
@@ -603,7 +635,8 @@ public class PartitionChangeBuilderTest {
topicId,
0,
isValidLeader,
- leaderRecoveryMetadataVersion(false)
+ leaderRecoveryMetadataVersion(false),
+ 2
);
// Before we build the new PartitionChangeBuilder, confirm the current leader is 0.
@@ -617,7 +650,7 @@ public class PartitionChangeBuilderTest {
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(NO_LEADER),
- PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+ (short) 0)),
partitionChangeBuilder.setTargetIsr(Arrays.asList(0, 1, 2, 3)).
build());
}
@@ -629,4 +662,180 @@ public class PartitionChangeBuilderTest {
return MetadataVersion.IBP_3_1_IV0;
}
}
+
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) {
+ PartitionRegistration partition = new PartitionRegistration.Builder()
+ .setReplicas(new int[] {1, 2, 3, 4})
+ .setIsr(new int[] {1, 2, 3, 4})
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(100)
+ .setPartitionEpoch(200)
+ .build();
+ Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3)
+ .setElection(Election.PREFERRED)
+ .setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+
+ // Update ISR to {1, 2}
+ builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2)));
+
+ PartitionChangeRecord record = new PartitionChangeRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
+ .setIsr(Arrays.asList(1, 2))
+ .setLeader(-2)
+ .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
+ if (version > 0) {
+ record.setEligibleLeaderReplicas(Arrays.asList(3, 4));
+ }
+ ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
+ record,
+ version
+ );
+ assertEquals(Optional.of(expectedRecord), builder.build());
+ partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
+ if (version > 0) {
+ assertTrue(Arrays.equals(new int[]{3, 4}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
+ } else {
+ assertEquals(0, partition.elr.length);
+ assertEquals(0, partition.lastKnownElr.length);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {
+ PartitionRegistration partition = new PartitionRegistration.Builder()
+ .setReplicas(new int[] {1, 2, 3, 4})
+ .setIsr(new int[] {1, 2})
+ .setElr(new int[] {3})
+ .setLastKnownElr(new int[] {4})
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(100)
+ .setPartitionEpoch(200)
+ .build();
+ Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+ // Min ISR is 3.
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3)
+ .setElection(Election.PREFERRED)
+ .setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+
+ builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3)));
+ PartitionChangeRecord record = new PartitionChangeRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
+ .setIsr(Arrays.asList(1, 2, 3))
+ .setLeader(-2)
+ .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
+
+ // Both versions will set the elr and lastKnownElr as empty list.
+ record.setEligibleLeaderReplicas(Collections.emptyList())
+ .setLastKnownELR(Collections.emptyList());
+ ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
+ record,
+ version
+ );
+ assertEquals(Optional.of(expectedRecord), builder.build());
+ partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
+ assertEquals(0, partition.elr.length);
+ assertEquals(0, partition.lastKnownElr.length);
+ }
+
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testEligibleLeaderReplicas_IsrAddNewMemberNotInELR(short version) {
+ PartitionRegistration partition = new PartitionRegistration.Builder()
+ .setReplicas(new int[] {1, 2, 3, 4})
+ .setIsr(new int[] {1})
+ .setElr(new int[] {3})
+ .setLastKnownElr(new int[] {2})
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(100)
+ .setPartitionEpoch(200)
+ .build();
+ Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+ // Min ISR is 3.
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3)
+ .setElection(Election.PREFERRED)
+ .setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+
+ builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 4)));
+ PartitionChangeRecord record = new PartitionChangeRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
+ .setIsr(Arrays.asList(1, 4))
+ .setLeader(-2)
+ .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
+ if (version == 0) {
+ record.setEligibleLeaderReplicas(Collections.emptyList());
+ record.setLastKnownELR(Collections.emptyList());
+ }
+ // No change is expected to ELR/LastKnownELR.
+ ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
+ record,
+ version
+ );
+ assertEquals(Optional.of(expectedRecord), builder.build());
+ partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
+ if (version > 0) {
+ assertTrue(Arrays.equals(new int[]{3}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{2}, partition.lastKnownElr), partition.toString());
+ } else {
+ assertTrue(Arrays.equals(new int[]{}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("partitionChangeRecordVersions")
+ public void testEligibleLeaderReplicas_RemoveUncleanShutdownReplicasFromElr(short version) {
+ PartitionRegistration partition = new PartitionRegistration.Builder()
+ .setReplicas(new int[] {1, 2, 3, 4})
+ .setIsr(new int[] {1})
+ .setElr(new int[] {2, 3})
+ .setLastKnownElr(new int[] {})
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(100)
+ .setPartitionEpoch(200)
+ .build();
+ Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+ // Min ISR is 3.
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 3)
+ .setElection(Election.PREFERRED)
+ .setEligibleLeaderReplicasEnabled(isElrEnabled(version));
+
+ builder.setUncleanShutdownReplicas(Arrays.asList(3));
+
+ PartitionChangeRecord record = new PartitionChangeRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
+ .setLeader(-2)
+ .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
+ if (version > 0) {
+ record.setEligibleLeaderReplicas(Arrays.asList(2))
+ .setLastKnownELR(Arrays.asList(3));
+ } else {
+ record.setEligibleLeaderReplicas(Collections.emptyList());
+ }
+ ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
+ record,
+ version
+ );
+ assertEquals(Optional.of(expectedRecord), builder.build());
+ partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
+ if (version > 0) {
+ assertTrue(Arrays.equals(new int[]{2}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{3}, partition.lastKnownElr), partition.toString());
+ } else {
+ assertTrue(Arrays.equals(new int[]{}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
+ }
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 3447cd45020..298cf0dece9 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -188,7 +188,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
@@ -231,7 +231,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
@@ -573,7 +573,7 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV1)).
setListeners(listeners));
assertEquals(5L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 7973bb1c4a9..450ca84e0ef 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -159,6 +160,7 @@ public class ReplicationControlManagerTest {
private Optional createTopicPolicy = Optional.empty();
private MetadataVersion metadataVersion = MetadataVersion.latest();
private MockTime mockTime = new MockTime();
+ private boolean isElrEnabled = false;
Builder setCreateTopicPolicy(CreateTopicPolicy createTopicPolicy) {
this.createTopicPolicy = Optional.of(createTopicPolicy);
@@ -170,6 +172,11 @@ public class ReplicationControlManagerTest {
return this;
}
+ Builder setIsElrEnabled(Boolean isElrEnabled) {
+ this.isElrEnabled = isElrEnabled;
+ return this;
+ }
+
Builder setMockTime(MockTime mockTime) {
this.mockTime = mockTime;
return this;
@@ -178,7 +185,15 @@ public class ReplicationControlManagerTest {
ReplicationControlTestContext build() {
return new ReplicationControlTestContext(metadataVersion,
createTopicPolicy,
- mockTime);
+ mockTime,
+ isElrEnabled);
+ }
+
+ ReplicationControlTestContext build(MetadataVersion metadataVersion) {
+ return new ReplicationControlTestContext(metadataVersion,
+ createTopicPolicy,
+ mockTime,
+ isElrEnabled);
}
}
@@ -202,7 +217,8 @@ public class ReplicationControlManagerTest {
private ReplicationControlTestContext(
MetadataVersion metadataVersion,
Optional createTopicPolicy,
- MockTime time
+ MockTime time,
+ Boolean isElrEnabled
) {
this.time = time;
this.featureControl = new FeatureControlManager.Builder().
@@ -229,6 +245,7 @@ public class ReplicationControlManagerTest {
setClusterControl(clusterControl).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
+ setEligibleLeaderReplicasEnabled(isElrEnabled).
build();
clusterControl.activate();
}
@@ -880,6 +897,91 @@ public class ReplicationControlManagerTest {
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
}
+ @Test
+ public void testEligibleLeaderReplicas_ShrinkAndExpandIsr() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+ CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
+ new int[][] {new int[] {0, 1, 2}});
+
+ TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
+ assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
+ long brokerEpoch = ctx.currentBrokerEpoch(0);
+ ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
+
+ // Change ISR to {0}.
+ PartitionData shrinkIsrRequest = newAlterPartition(
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
+
+ ControllerResult shrinkIsrResult = sendAlterPartition(
+ replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
+ AlterPartitionResponseData.PartitionData shrinkIsrResponse = assertAlterPartitionResponse(
+ shrinkIsrResult, topicIdPartition, NONE);
+ assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
+ PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+ assertTrue(Arrays.equals(new int[]{1, 2}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
+
+ PartitionData expandIsrRequest = newAlterPartition(
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
+ ControllerResult expandIsrResult = sendAlterPartition(
+ replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
+ AlterPartitionResponseData.PartitionData expandIsrResponse = assertAlterPartitionResponse(
+ expandIsrResult, topicIdPartition, NONE);
+ assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
+ partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+ assertTrue(Arrays.equals(new int[]{}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
+ }
+
+ @Test
+ public void testEligibleLeaderReplicas_BrokerFence() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
+ CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
+ new int[][] {new int[] {0, 1, 2, 3}});
+
+ TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
+ assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
+ ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
+
+ ctx.fenceBrokers(Utils.mkSet(2, 3));
+
+ PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+ assertTrue(Arrays.equals(new int[]{3}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
+
+ ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+
+ partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+ assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
+
+ ctx.unfenceBrokers(0, 1, 2, 3);
+ partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
+ assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr), partition.toString());
+ assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
+ }
+
+ @Test
+ public void testEligibleLeaderReplicas_EffectiveMinIsr() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+ CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
+ new int[][]{new int[]{0, 1, 2}});
+
+ TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
+ assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
+ ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "5");
+ assertEquals(3, replicationControl.getTopicEffectiveMinIsr("foo"));
+ }
+
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionHandleUnknownTopicIdOrName(short version) throws Exception {
@@ -1504,7 +1606,7 @@ public class ReplicationControlManagerTest {
setReplicas(asList(2, 1, 3)).
setLeader(3).
setRemovingReplicas(Collections.emptyList()).
- setAddingReplicas(Collections.emptyList()), (short) 0)),
+ setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
@@ -1855,7 +1957,7 @@ public class ReplicationControlManagerTest {
setLeader(4).
setReplicas(asList(2, 3, 4)).
setRemovingReplicas(null).
- setAddingReplicas(Collections.emptyList()), (short) 0)),
+ setAddingReplicas(Collections.emptyList()), MetadataVersion.latest().partitionChangeRecordVersion())),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
@@ -1908,8 +2010,8 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
- ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
+ public void testElectUncleanLeaders_WithoutElr(boolean electAllPartitions) throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(MetadataVersion.IBP_3_6_IV1);
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@@ -2186,13 +2288,13 @@ public class ReplicationControlManagerTest {
setPartitionId(0).
setTopicId(fooId).
setLeader(1),
- (short) 0),
+ MetadataVersion.latest().partitionChangeRecordVersion()),
new ApiMessageAndVersion(
new PartitionChangeRecord().
setPartitionId(2).
setTopicId(fooId).
setLeader(0),
- (short) 0)),
+ MetadataVersion.latest().partitionChangeRecordVersion())),
election2Result.records());
}
@@ -2235,7 +2337,7 @@ public class ReplicationControlManagerTest {
.setPartitionId(0)
.setTopicId(fooId)
.setLeader(1);
- assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, (short) 0)), balanceResult.records());
+ assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latest().partitionChangeRecordVersion())), balanceResult.records());
assertTrue(replication.arePartitionLeadersImbalanced());
assertFalse(balanceResult.response());
@@ -2267,7 +2369,7 @@ public class ReplicationControlManagerTest {
.setPartitionId(2)
.setTopicId(fooId)
.setLeader(0);
- assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, (short) 0)), balanceResult.records());
+ assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latest().partitionChangeRecordVersion())), balanceResult.records());
assertFalse(replication.arePartitionLeadersImbalanced());
assertFalse(balanceResult.response());
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
index 2362629ae4a..d68460febd0 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsChangesTest.java
@@ -160,15 +160,15 @@ public class ControllerMetricsChangesTest {
static {
TOPIC_DELTA1 = new TopicDelta(new TopicImage("foo", FOO_ID, Collections.emptyMap()));
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
- toRecord(FOO_ID, 0).message());
+ toRecord(FOO_ID, 0, (short) 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
- toRecord(FOO_ID, 1).message());
+ toRecord(FOO_ID, 1, (short) 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
- toRecord(FOO_ID, 2).message());
+ toRecord(FOO_ID, 2, (short) 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NON_PREFERRED_LEADER).
- toRecord(FOO_ID, 3).message());
+ toRecord(FOO_ID, 3, (short) 0).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NON_PREFERRED_LEADER).
- toRecord(FOO_ID, 4).message());
+ toRecord(FOO_ID, 4, (short) 0).message());
TOPIC_DELTA2 = new TopicDelta(TOPIC_DELTA1.apply());
TOPIC_DELTA2.replay(new PartitionChangeRecord().
@@ -176,7 +176,7 @@ public class ControllerMetricsChangesTest {
setTopicId(FOO_ID).
setLeader(1));
TOPIC_DELTA2.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
- toRecord(FOO_ID, 5).message());
+ toRecord(FOO_ID, 5, (short) 0).message());
}
@Test
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 873959965ed..5c73bf0109f 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -30,9 +30,14 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -43,6 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(40)
public class PartitionRegistrationTest {
+ private static Stream partitionRecordVersions() {
+ return IntStream.range(PartitionRecord.LOWEST_SUPPORTED_VERSION, PartitionRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version));
+ }
@Test
public void testElectionWasClean() {
assertTrue(PartitionRegistration.electionWasClean(1, new int[]{1, 2}));
@@ -58,12 +66,12 @@ public class PartitionRegistrationTest {
PartitionRegistration b = new PartitionRegistration.Builder().
setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{3}).setLeader(3).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(1).build();
PartitionRegistration c = new PartitionRegistration.Builder().
- setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
+ setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1}).setLastKnownElr(new int[]{3}).setElr(new int[]{2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
assertEquals(b, a.merge(new PartitionChangeRecord().
setLeader(3).setIsr(Arrays.asList(3))));
assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, partitionEpoch: 0 -> 1",
b.diff(a));
- assertEquals("isr: [1, 2] -> [1], partitionEpoch: 0 -> 1",
+ assertEquals("isr: [1, 2] -> [1], elr: [] -> [2], lastKnownElr: [] -> [3], partitionEpoch: 0 -> 1",
c.diff(a));
}
@@ -73,7 +81,7 @@ public class PartitionRegistrationTest {
setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setRemovingReplicas(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
int partitionId = 4;
- ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId);
+ ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId, (short) 0);
PartitionRegistration registrationB =
new PartitionRegistration((PartitionRecord) record.message());
assertEquals(registrationA, registrationB);
@@ -200,8 +208,10 @@ public class PartitionRegistrationTest {
@Test
public void testBuilderSuccess() {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
- setReplicas(new int[]{0, 1}).
+ setReplicas(new int[]{0, 1, 2}).
setIsr(new int[]{0, 1}).
+ setElr(new int[]{2}).
+ setLastKnownElr(new int[]{0, 1, 2}).
setRemovingReplicas(new int[]{0}).
setAddingReplicas(new int[]{1}).
setLeader(0).
@@ -209,8 +219,10 @@ public class PartitionRegistrationTest {
setLeaderEpoch(0).
setPartitionEpoch(0);
PartitionRegistration partitionRegistration = builder.build();
- assertEquals(Replicas.toList(new int[]{0, 1}), Replicas.toList(partitionRegistration.replicas));
+ assertEquals(Replicas.toList(new int[]{0, 1, 2}), Replicas.toList(partitionRegistration.replicas));
assertEquals(Replicas.toList(new int[]{0, 1}), Replicas.toList(partitionRegistration.isr));
+ assertEquals(Replicas.toList(new int[]{2}), Replicas.toList(partitionRegistration.elr));
+ assertEquals(Replicas.toList(new int[]{0, 1, 2}), Replicas.toList(partitionRegistration.lastKnownElr));
assertEquals(Replicas.toList(new int[]{0}), Replicas.toList(partitionRegistration.removingReplicas));
assertEquals(Replicas.toList(new int[]{1}), Replicas.toList(partitionRegistration.addingReplicas));
assertEquals(0, partitionRegistration.leader);
@@ -233,17 +245,49 @@ public class PartitionRegistrationTest {
assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas));
}
+ @ParameterizedTest
+ @MethodSource("partitionRecordVersions")
+ public void testPartitionRegistrationToRecord(short version) {
+ PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
+ setReplicas(new int[]{0, 1, 2, 3, 4}).
+ setIsr(new int[]{0, 1}).
+ setLeader(0).
+ setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
+ setLeaderEpoch(0).
+ setPartitionEpoch(0).
+ setElr(new int[]{2, 3}).
+ setLastKnownElr(new int[]{4});
+ PartitionRegistration partitionRegistration = builder.build();
+ Uuid topicID = Uuid.randomUuid();
+ PartitionRecord expectRecord = new PartitionRecord().
+ setTopicId(topicID).
+ setPartitionId(0).
+ setReplicas(Arrays.asList(new Integer[]{0, 1, 2, 3, 4})).
+ setIsr(Arrays.asList(new Integer[]{0, 1})).
+ setLeader(0).
+ setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
+ setLeaderEpoch(0).
+ setPartitionEpoch(0);
+ if (version > 0) {
+ expectRecord.
+ setEligibleLeaderReplicas(Arrays.asList(new Integer[]{2, 3})).
+ setLastKnownELR(Arrays.asList(new Integer[]{4}));
+ }
+ assertEquals(new ApiMessageAndVersion(expectRecord, version), partitionRegistration.toRecord(topicID, 0, version));
+ assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas));
+ }
+
@Property
public void testConsistentEqualsAndHashCode(
@ForAll("uniqueSamples") PartitionRegistration a,
@ForAll("uniqueSamples") PartitionRegistration b
) {
if (a.equals(b)) {
- assertEquals(a.hashCode(), b.hashCode());
+ assertEquals(a.hashCode(), b.hashCode(), "a=" + a + "\nb=" + b);
}
if (a.hashCode() != b.hashCode()) {
- assertNotEquals(a, b);
+ assertNotEquals(a, b, "a=" + a + "\nb=" + b);
}
}
@@ -251,24 +295,23 @@ public class PartitionRegistrationTest {
Arbitrary uniqueSamples() {
return Arbitraries.of(
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
- setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
+ setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
- setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(101).setPartitionEpoch(200).build(),
+ setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(101).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
- setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(201).build(),
+ setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(201).setElr(new int[] {1, 2}).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}).
- setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
+ setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERING).setLeaderEpoch(100).setPartitionEpoch(200).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {4, 5, 6}).setAddingReplicas(new int[] {1, 2, 3}).
- setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
+ setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
- setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
- new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 2, 3}).
- setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build(),
+ setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setLastKnownElr(new int[] {1, 2}).build(),
+ new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setRemovingReplicas(new int[] {1, 3}).
+ setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(),
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
- setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build()
+ setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {2, 3}).setLastKnownElr(new int[] {1, 2}).build()
);
}
-
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 44764e9c641..f7ca2fc242f 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -186,7 +186,10 @@ public enum MetadataVersion {
IBP_3_6_IV2(14, "3.6", "IV2", true),
// Implement KIP-919 controller registration.
- IBP_3_7_IV0(15, "3.7", "IV0", true);
+ IBP_3_7_IV0(15, "3.7", "IV0", true),
+
+ // Add ELR related supports (KIP-966).
+ IBP_3_7_IV1(16, "3.7", "IV1", true);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version
@@ -286,6 +289,10 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_6_IV2);
}
+ public boolean isElrSupported() {
+ return this.isAtLeast(IBP_3_7_IV1);
+ }
+
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}
@@ -336,6 +343,22 @@ public enum MetadataVersion {
return this.isAtLeast(MetadataVersion.IBP_3_7_IV0);
}
+ public short partitionChangeRecordVersion() {
+ if (isElrSupported()) {
+ return (short) 1;
+ } else {
+ return (short) 0;
+ }
+ }
+
+ public short partitionRecordVersion() {
+ if (isElrSupported()) {
+ return (short) 1;
+ } else {
+ return (short) 0;
+ }
+ }
+
public short fetchRequestVersion() {
if (this.isAtLeast(IBP_3_5_IV1)) {
return 15;
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 27028b86cfe..2ed6e6cdb31 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -164,6 +164,9 @@ class MetadataVersionTest {
assertEquals(IBP_3_6_IV0, MetadataVersion.fromVersionString("3.6-IV0"));
assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1"));
assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2"));
+
+ assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0"));
+ assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1"));
}
@Test
@@ -218,6 +221,8 @@ class MetadataVersionTest {
assertEquals("3.6", IBP_3_6_IV0.shortVersion());
assertEquals("3.6", IBP_3_6_IV1.shortVersion());
assertEquals("3.6", IBP_3_6_IV2.shortVersion());
+ assertEquals("3.7", IBP_3_7_IV0.shortVersion());
+ assertEquals("3.7", IBP_3_7_IV1.shortVersion());
}
@Test
@@ -261,6 +266,8 @@ class MetadataVersionTest {
assertEquals("3.6-IV0", IBP_3_6_IV0.version());
assertEquals("3.6-IV1", IBP_3_6_IV1.version());
assertEquals("3.6-IV2", IBP_3_6_IV2.version());
+ assertEquals("3.7-IV0", IBP_3_7_IV0.version());
+ assertEquals("3.7-IV1", IBP_3_7_IV1.version());
}
@Test
@@ -319,6 +326,17 @@ class MetadataVersionTest {
metadataVersion.isDelegationTokenSupported());
}
+ @ParameterizedTest
+ @EnumSource(value = MetadataVersion.class)
+ public void testIsElrSupported(MetadataVersion metadataVersion) {
+ assertEquals(metadataVersion.equals(IBP_3_7_IV1),
+ metadataVersion.isElrSupported());
+ short expectPartitionRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
+ assertEquals(expectPartitionRecordVersion, metadataVersion.partitionRecordVersion());
+ short expectPartitionChangeRecordVersion = metadataVersion.equals(IBP_3_7_IV1) ? (short) 1 : (short) 0;
+ assertEquals(expectPartitionChangeRecordVersion, metadataVersion.partitionChangeRecordVersion());
+ }
+
@ParameterizedTest
@EnumSource(value = MetadataVersion.class)
public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 9258a1ee026..6e38af8956e 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -68,7 +68,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
- "SupportedMaxVersion: 3.7-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
+ "SupportedMaxVersion: 3.7-IV1\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
}
@ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV0)
@@ -78,7 +78,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
- "SupportedMaxVersion: 3.7-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput));
+ "SupportedMaxVersion: 3.7-IV1\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput));
}
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
@@ -137,7 +137,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
- "metadata.version. Local controller 3000 only supports versions 1-15", commandOutput);
+ "metadata.version. Local controller 3000 only supports versions 1-16", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),