Browse Source

KAFKA-15021; Skip leader epoch bump on ISR shrink (#13765)

When the KRaft controller removes a replica from the ISR because of the controlled shutdown there is no need for the leader epoch to be increased by the KRaft controller. This is accurate as long as the topic partition leader doesn't add the removed replica back to the ISR.

This change also fixes a bug when computing the HWM. When computing the HWM, replicas that are not eligible to join the ISR but are caught up should not be included in the computation. Otherwise, the HWM will never increase for replica.lag.time.max.ms because the shutting down replica is not sending FETCH request. Without this additional fix PRODUCE requests would timeout if the request timeout is greater than replica.lag.time.max.ms.

Because of the bug above the KRaft controller needs to check the MV to guarantee that all brokers support this bug fix before skipping the leader epoch bump.

Reviewers: David Mao <47232755+splett2@users.noreply.github.com>, Divij Vaidya <diviv@amazon.com>, David Jacot <djacot@confluent.io>
pull/13530/head
José Armando García Sancio 1 year ago committed by GitHub
parent
commit
8ad0ed3e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 2
      core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
  3. 2
      core/src/test/java/kafka/test/annotation/ClusterTest.java
  4. 57
      core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
  5. 104
      core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
  6. 42
      metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
  7. 29
      metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
  8. 102
      metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
  9. 8
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
  10. 22
      metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
  11. 11
      server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
  12. 4
      tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

24
core/src/main/scala/kafka/cluster/Partition.scala

@ -1067,12 +1067,12 @@ class Partition(val topicPartition: TopicPartition, @@ -1067,12 +1067,12 @@ class Partition(val topicPartition: TopicPartition,
* 1. Partition ISR changed
* 2. Any replica's LEO changed
*
* The HW is determined by the smallest log end offset among all replicas that are in sync or are considered caught-up.
* This way, if a replica is considered caught-up, but its log end offset is smaller than HW, we will wait for this
* replica to catch up to the HW before advancing the HW. This helps the situation when the ISR only includes the
* leader replica and a follower tries to catch up. If we don't wait for the follower when advancing the HW, the
* follower's log end offset may keep falling behind the HW (determined by the leader's log end offset) and therefore
* will never be added to ISR.
* The HW is determined by the smallest log end offset among all replicas that are in sync; or are considered caught-up
* and are allowed to join the ISR. This way, if a replica is considered caught-up, but its log end offset is smaller
* than HW, we will wait for this replica to catch up to the HW before advancing the HW. This helps the situation when
* the ISR only includes the leader replica and a follower tries to catch up. If we don't wait for the follower when
* advancing the HW, the follower's log end offset may keep falling behind the HW (determined by the leader's log end
* offset) and therefore will never be added to ISR.
*
* With the addition of AlterPartition, we also consider newly added replicas as part of the ISR when advancing
* the HW. These replicas have not yet been committed to the ISR by the controller, so we could revert to the previously
@ -1089,11 +1089,17 @@ class Partition(val topicPartition: TopicPartition, @@ -1089,11 +1089,17 @@ class Partition(val topicPartition: TopicPartition,
val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
var newHighWatermark = leaderLogEndOffset
remoteReplicasMap.values.foreach { replica =>
// Note here we are using the "maximal", see explanation above
val replicaState = replica.stateSnapshot
def shouldWaitForReplicaToJoinIsr: Boolean = {
replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) &&
isReplicaIsrEligible(replica.brokerId)
}
// Note here we are using the "maximal", see explanation above
if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
(replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs)
|| partitionState.maximalIsr.contains(replica.brokerId))) {
(partitionState.maximalIsr.contains(replica.brokerId) || shouldWaitForReplicaToJoinIsr)
) {
newHighWatermark = replicaState.logEndOffsetMetadata
}
}

2
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java

@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest { @@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testDefaults(ClusterConfig config) {
Assertions.assertEquals(MetadataVersion.IBP_3_5_IV2, config.metadataVersion());
Assertions.assertEquals(MetadataVersion.IBP_3_6_IV0, config.metadataVersion());
}
}

2
core/src/test/java/kafka/test/annotation/ClusterTest.java

@ -41,6 +41,6 @@ public @interface ClusterTest { @@ -41,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV2;
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_6_IV0;
ClusterConfigProperty[] serverProperties() default {};
}

57
core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala

@ -48,8 +48,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -48,8 +48,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
val numServers = 2
overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, false, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
TestUtils.createBrokerConfigs(
numServers,
zkConnectOrNull,
enableControlledShutdown = true,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile,
saslProperties = serverSaslProperties
).map(KafkaConfig.fromProps(_, overridingProps))
}
private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
@ -357,6 +363,53 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -357,6 +363,53 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSendToPartitionWithFollowerShutdownShouldNotTimeout(quorum: String): Unit = {
// This test produces to a leader that has follower that is shutting down. It shows that
// the produce request succeed, do not timeout and do not need to be retried.
val producer = createProducer()
val follower = 1
val replicas = List(0, follower)
try {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 3, Map(0 -> replicas))
val partition = 0
val now = System.currentTimeMillis()
val futures = (1 to numRecords).map { i =>
producer.send(new ProducerRecord(topic, partition, now, null, ("value" + i).getBytes(StandardCharsets.UTF_8)))
}
// Shutdown the follower
killBroker(follower)
// make sure all of them end up in the same partition with increasing offset values
futures.zip(0 until numRecords).foreach { case (future, offset) =>
val recordMetadata = future.get(30, TimeUnit.SECONDS)
assertEquals(offset.toLong, recordMetadata.offset)
assertEquals(topic, recordMetadata.topic)
assertEquals(partition, recordMetadata.partition)
}
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
// make sure the fetched messages also respect the partitioning and ordering
val records = TestUtils.consumeRecords(consumer, numRecords)
records.zipWithIndex.foreach { case (record, i) =>
assertEquals(topic, record.topic)
assertEquals(partition, record.partition)
assertEquals(i.toLong, record.offset)
assertNull(record.key)
assertEquals(s"value${i + 1}", new String(record.value))
assertEquals(now, record.timestamp)
}
} finally {
producer.close()
}
}
/**
* Checks partitioning behavior before and after partitions are added
*

104
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

@ -1184,7 +1184,7 @@ class PartitionTest extends AbstractPartitionTest { @@ -1184,7 +1184,7 @@ class PartitionTest extends AbstractPartitionTest {
builder.build()
}
def createIdempotentRecords(records: Iterable[SimpleRecord],
def createIdempotentRecords(records: Iterable[SimpleRecord],
baseOffset: Long,
baseSequence: Int = 0,
producerId: Long = 1L): MemoryRecords = {
@ -1456,6 +1456,108 @@ class PartitionTest extends AbstractPartitionTest { @@ -1456,6 +1456,108 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(alterPartitionListener.failures.get, 1)
}
@ParameterizedTest
@ValueSource(strings = Array("fenced", "shutdown", "unfenced"))
def testHighWatermarkIncreasesWithFencedOrShutdownFollower(brokerState: String): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List(brokerId, remoteBrokerId)
val shrinkedIsr = Set(brokerId)
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager
)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(replicas.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false),
offsetCheckpoints,
None
),
"Expected become leader transition to succeed"
)
assertEquals(replicas.toSet, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
// Fetch to let the follower catch up to the log end offset
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
// Follower fetches and catches up to the log end offset.
assertReplicaState(
partition,
remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = log.logEndOffset
)
// Check that the leader updated the HWM to the LEO which is what the follower has
assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark)
if (brokerState == "fenced") {
when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
} else if (brokerState == "shutdown") {
when(metadataCache.isBrokerShuttingDown(remoteBrokerId)).thenReturn(true)
}
// Append records to the log as leader of the current epoch
seedLogData(log, numRecords = 10, leaderEpoch)
// Controller shrinks the ISR after
assertFalse(
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(shrinkedIsr.toList.map(Int.box).asJava)
.setPartitionEpoch(2)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false),
offsetCheckpoints,
None
),
"Expected to stay leader"
)
assertTrue(partition.isLeader)
assertEquals(shrinkedIsr, partition.partitionState.isr)
assertEquals(shrinkedIsr, partition.partitionState.maximalIsr)
assertEquals(Set.empty, partition.getOutOfSyncReplicas(partition.replicaLagTimeMaxMs))
// In the case of unfenced, the HWM doesn't increase, otherwise the the HWM increases because the
// fenced and shutdown replica is not considered during HWM calculation.
if (brokerState == "unfenced") {
assertEquals(10, partition.localLogOrException.highWatermark)
} else {
assertEquals(20, partition.localLogOrException.highWatermark)
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {

42
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java

@ -30,6 +30,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState; @@ -30,6 +30,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@ -73,7 +74,7 @@ public class PartitionChangeBuilder { @@ -73,7 +74,7 @@ public class PartitionChangeBuilder {
private final Uuid topicId;
private final int partitionId;
private final IntPredicate isAcceptableLeader;
private final boolean isLeaderRecoverySupported;
private final MetadataVersion metadataVersion;
private List<Integer> targetIsr;
private List<Integer> targetReplicas;
private List<Integer> targetRemoving;
@ -81,16 +82,18 @@ public class PartitionChangeBuilder { @@ -81,16 +82,18 @@ public class PartitionChangeBuilder {
private Election election = Election.ONLINE;
private LeaderRecoveryState targetLeaderRecoveryState;
public PartitionChangeBuilder(PartitionRegistration partition,
Uuid topicId,
int partitionId,
IntPredicate isAcceptableLeader,
boolean isLeaderRecoverySupported) {
public PartitionChangeBuilder(
PartitionRegistration partition,
Uuid topicId,
int partitionId,
IntPredicate isAcceptableLeader,
MetadataVersion metadataVersion
) {
this.partition = partition;
this.topicId = topicId;
this.partitionId = partitionId;
this.isAcceptableLeader = isAcceptableLeader;
this.isLeaderRecoverySupported = isLeaderRecoverySupported;
this.metadataVersion = metadataVersion;
this.targetIsr = Replicas.toList(partition.isr);
this.targetReplicas = Replicas.toList(partition.replicas);
this.targetRemoving = Replicas.toList(partition.removingReplicas);
@ -104,9 +107,12 @@ public class PartitionChangeBuilder { @@ -104,9 +107,12 @@ public class PartitionChangeBuilder {
}
public PartitionChangeBuilder setTargetIsrWithBrokerStates(List<BrokerState> targetIsrWithEpoch) {
this.targetIsr = targetIsrWithEpoch.stream()
.map(brokerState -> brokerState.brokerId()).collect(Collectors.toList());
return this;
return setTargetIsr(
targetIsrWithEpoch
.stream()
.map(brokerState -> brokerState.brokerId())
.collect(Collectors.toList())
);
}
public PartitionChangeBuilder setTargetReplicas(List<Integer> targetReplicas) {
@ -236,7 +242,7 @@ public class PartitionChangeBuilder { @@ -236,7 +242,7 @@ public class PartitionChangeBuilder {
// new leader. This can result in data loss!
record.setIsr(Collections.singletonList(electionResult.node));
if (partition.leaderRecoveryState != LeaderRecoveryState.RECOVERING &&
isLeaderRecoverySupported) {
metadataVersion.isLeaderRecoverySupported()) {
// And mark the leader recovery state as RECOVERING
record.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value());
}
@ -251,8 +257,7 @@ public class PartitionChangeBuilder { @@ -251,8 +257,7 @@ public class PartitionChangeBuilder {
*
* We need to bump the leader epoch if:
* 1. The leader changed, or
* 2. The new ISR does not contain all the nodes that the old ISR did, or
* 3. The new replica list does not contain all the nodes that the old replica list did.
* 2. The new replica list does not contain all the nodes that the old replica list did.
*
* Changes that do NOT fall in any of these categories will increase the partition epoch, but
* not the leader epoch. Note that if the leader epoch increases, the partition epoch will
@ -263,11 +268,18 @@ public class PartitionChangeBuilder { @@ -263,11 +268,18 @@ public class PartitionChangeBuilder {
* NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of
* case 1. In this function, we check for cases 2 and 3, and handle them by manually
* setting record.leader to the current leader.
*
* In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager
* that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader
* bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if
* the ISR expanded.
*/
void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
if (record.leader() == NO_LEADER_CHANGE) {
if (!Replicas.contains(targetIsr, partition.isr) ||
!Replicas.contains(targetReplicas, partition.replicas)) {
if (!Replicas.contains(targetReplicas, partition.replicas)) {
record.setLeader(partition.leader);
} else if (!metadataVersion.isSkipLeaderEpochBumpSupported() &&
!Replicas.contains(targetIsr, partition.isr)) {
record.setLeader(partition.leader);
}
}

29
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

@ -993,7 +993,8 @@ public class ReplicationControlManager { @@ -993,7 +993,8 @@ public class ReplicationControlManager {
topic.id,
partitionId,
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@ -1374,11 +1375,13 @@ public class ReplicationControlManager { @@ -1374,11 +1375,13 @@ public class ReplicationControlManager {
if (electionType == ElectionType.UNCLEAN) {
election = PartitionChangeBuilder.Election.UNCLEAN;
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
PartitionChangeBuilder builder = new PartitionChangeBuilder(
partition,
topicId,
partitionId,
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
builder.setElection(election);
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
@ -1512,7 +1515,7 @@ public class ReplicationControlManager { @@ -1512,7 +1515,7 @@ public class ReplicationControlManager {
topicPartition.topicId(),
topicPartition.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported()
featureControl.metadataVersion()
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
builder.build().ifPresent(records::add);
@ -1728,11 +1731,13 @@ public class ReplicationControlManager { @@ -1728,11 +1731,13 @@ public class ReplicationControlManager {
throw new RuntimeException("Partition " + topicIdPart +
" existed in isrMembers, but not in the partitions map.");
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
PartitionChangeBuilder builder = new PartitionChangeBuilder(
partition,
topicIdPart.topicId(),
topicIdPart.partitionId(),
isAcceptableLeader,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@ -1838,11 +1843,13 @@ public class ReplicationControlManager { @@ -1838,11 +1843,13 @@ public class ReplicationControlManager {
"it would require an unclean leader election.");
}
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
PartitionChangeBuilder builder = new PartitionChangeBuilder(
part,
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@ -1893,11 +1900,13 @@ public class ReplicationControlManager { @@ -1893,11 +1900,13 @@ public class ReplicationControlManager {
List<Integer> currentReplicas = Replicas.toList(part.replicas);
PartitionReassignmentReplicas reassignment =
new PartitionReassignmentReplicas(currentAssignment, targetAssignment);
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
PartitionChangeBuilder builder = new PartitionChangeBuilder(
part,
tp.topicId(),
tp.partitionId(),
clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
featureControl.metadataVersion()
);
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}

102
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java

@ -27,6 +27,7 @@ import org.apache.kafka.metadata.PartitionRegistration; @@ -27,6 +27,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
@ -89,7 +90,7 @@ public class PartitionChangeBuilderTest { @@ -89,7 +90,7 @@ public class PartitionChangeBuilderTest {
private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
private static PartitionChangeBuilder createFooBuilder() {
return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, true);
return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.latest());
}
private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
@ -106,7 +107,7 @@ public class PartitionChangeBuilderTest { @@ -106,7 +107,7 @@ public class PartitionChangeBuilderTest {
private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
private static PartitionChangeBuilder createBarBuilder() {
return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, true);
return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, MetadataVersion.latest());
}
private static final PartitionRegistration BAZ = new PartitionRegistration.Builder().
@ -121,7 +122,7 @@ public class PartitionChangeBuilderTest { @@ -121,7 +122,7 @@ public class PartitionChangeBuilderTest {
private final static Uuid BAZ_ID = Uuid.fromString("wQzt5gkSTwuQNXZF5gIw7A");
private static PartitionChangeBuilder createBazBuilder() {
return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true, true);
return new PartitionChangeBuilder(BAZ, BAZ_ID, 0, __ -> true, MetadataVersion.latest());
}
private static final PartitionRegistration OFFLINE = new PartitionRegistration.Builder().
@ -136,7 +137,7 @@ public class PartitionChangeBuilderTest { @@ -136,7 +137,7 @@ public class PartitionChangeBuilderTest {
private final static Uuid OFFLINE_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
private static PartitionChangeBuilder createOfflineBuilder() {
return new PartitionChangeBuilder(OFFLINE, OFFLINE_ID, 0, r -> r == 1, true);
return new PartitionChangeBuilder(OFFLINE, OFFLINE_ID, 0, r -> r == 1, MetadataVersion.latest());
}
private static void assertElectLeaderEquals(PartitionChangeBuilder builder,
@ -183,19 +184,39 @@ public class PartitionChangeBuilderTest { @@ -183,19 +184,39 @@ public class PartitionChangeBuilderTest {
public void testTriggerLeaderEpochBumpIfNeeded() {
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(),
new PartitionChangeRecord(), NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(), 1);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
// Shrinking the ISR doesn't increase the leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder().setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
),
new PartitionChangeRecord(),
NO_LEADER_CHANGE);
NO_LEADER_CHANGE
);
// Expanding the ISR doesn't increase the leader epoch
testTriggerLeaderEpochBumpIfNeededLeader(
createFooBuilder().setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))
),
new PartitionChangeRecord(),
NO_LEADER_CHANGE
);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
new PartitionChangeRecord().setLeader(2), 2);
// Check that the leader epoch is bump if the ISR shrinks and isSkipLeaderEpochBumpSupported is not supported.
// See KAFKA-15021 for details.
testTriggerLeaderEpochBumpIfNeededLeader(
new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.IBP_3_5_IV2)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
),
new PartitionChangeRecord(),
1
);
}
@Test
@ -208,13 +229,26 @@ public class PartitionChangeBuilderTest { @@ -208,13 +229,26 @@ public class PartitionChangeBuilderTest {
}
@Test
public void testIsrChangeAndLeaderBump() {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setIsr(Arrays.asList(2, 1)).
setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createFooBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))).build());
public void testIsrChangeDoesntBumpLeaderEpoch() {
// Changing the ISR should not cause the leader epoch to increase
assertEquals(
// Expected
Optional.of(
new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(FOO_ID)
.setPartitionId(0)
.setIsr(Arrays.asList(2, 1)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
),
// Actual
createFooBuilder()
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
)
.build()
);
}
@Test
@ -363,13 +397,15 @@ public class PartitionChangeBuilderTest { @@ -363,13 +397,15 @@ public class PartitionChangeBuilderTest {
setPartitionEpoch(200).
build();
MetadataVersion metadataVersion = leaderRecoveryMetadataVersion(isLeaderRecoverySupported);
// Change the partition so that there is no leader
PartitionChangeBuilder offlineBuilder = new PartitionChangeBuilder(
registration,
FOO_ID,
0,
brokerId -> false,
isLeaderRecoverySupported
metadataVersion
);
// Set the target ISR to empty to indicate that the last leader is offline
offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
@ -394,7 +430,7 @@ public class PartitionChangeBuilderTest { @@ -394,7 +430,7 @@ public class PartitionChangeBuilderTest {
FOO_ID,
0,
brokerId -> true,
isLeaderRecoverySupported
metadataVersion
);
// The only broker in the ISR is elected leader and stays in the recovering
@ -422,15 +458,16 @@ public class PartitionChangeBuilderTest { @@ -422,15 +458,16 @@ public class PartitionChangeBuilderTest {
setPartitionEpoch(200).
build();
MetadataVersion metadataVersion = leaderRecoveryMetadataVersion(isLeaderRecoverySupported);
// Change the partition using unclean leader election
PartitionChangeBuilder onlineBuilder = new PartitionChangeBuilder(
registration,
FOO_ID,
0,
brokerId -> brokerId == leaderId,
isLeaderRecoverySupported
metadataVersion
).setElection(Election.UNCLEAN);
// The partition should stay as recovering
PartitionChangeRecord changeRecord = (PartitionChangeRecord) onlineBuilder
@ -493,14 +530,13 @@ public class PartitionChangeBuilderTest { @@ -493,14 +530,13 @@ public class PartitionChangeBuilderTest {
// being stopped as leader.
IntPredicate isValidLeader = l -> false;
PartitionChangeBuilder partitionChangeBuilder =
new PartitionChangeBuilder(
part,
topicId,
0,
isValidLeader,
false
);
PartitionChangeBuilder partitionChangeBuilder = new PartitionChangeBuilder(
part,
topicId,
0,
isValidLeader,
leaderRecoveryMetadataVersion(false)
);
// Before we build the new PartitionChangeBuilder, confirm the current leader is 0.
assertEquals(0, part.leader);
@ -518,5 +554,11 @@ public class PartitionChangeBuilderTest { @@ -518,5 +554,11 @@ public class PartitionChangeBuilderTest {
build());
}
private MetadataVersion leaderRecoveryMetadataVersion(boolean isSupported) {
if (isSupported) {
return MetadataVersion.IBP_3_2_IV0;
} else {
return MetadataVersion.IBP_3_1_IV0;
}
}
}

8
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java

@ -189,7 +189,7 @@ public class QuorumControllerTest { @@ -189,7 +189,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
@ -230,7 +230,7 @@ public class QuorumControllerTest { @@ -230,7 +230,7 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
@ -567,7 +567,7 @@ public class QuorumControllerTest { @@ -567,7 +567,7 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)).
setListeners(listeners));
assertEquals(3L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
@ -988,7 +988,7 @@ public class QuorumControllerTest { @@ -988,7 +988,7 @@ public class QuorumControllerTest {
.setBrokerId(brokerId)
.setRack(null)
.setClusterId(controller.clusterId())
.setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2))
.setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0))
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
.setListeners(
new ListenerCollection(

22
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

@ -1572,7 +1572,7 @@ public class ReplicationControlManagerTest { @@ -1572,7 +1572,7 @@ public class ReplicationControlManagerTest {
setIsr(new int[] {1, 2, 4}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(1).
setLeaderEpoch(0).
setPartitionEpoch(1).
build(),
replication.getPartition(fooId, 0));
@ -1586,7 +1586,7 @@ public class ReplicationControlManagerTest { @@ -1586,7 +1586,7 @@ public class ReplicationControlManagerTest {
.setPartitions(asList(new PartitionData()
.setPartitionIndex(0)
.setPartitionEpoch(1)
.setLeaderEpoch(1)
.setLeaderEpoch(0)
.setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
ControllerRequestContext requestContext =
@ -1620,7 +1620,7 @@ public class ReplicationControlManagerTest { @@ -1620,7 +1620,7 @@ public class ReplicationControlManagerTest {
.setPartitions(asList(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(0)
.setLeaderId(1)
.setLeaderEpoch(1)
.setLeaderEpoch(0)
.setIsr(asList(1, 2, 3, 4))
.setPartitionEpoch(2)
.setErrorCode(NONE.code()))))),
@ -1650,7 +1650,7 @@ public class ReplicationControlManagerTest { @@ -1650,7 +1650,7 @@ public class ReplicationControlManagerTest {
setPartitions(asList(new PartitionData().
setPartitionIndex(0).
setPartitionEpoch(1).
setLeaderEpoch(1).
setLeaderEpoch(0).
setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
// The broker 4 has failed silently and now registers again.
@ -1767,7 +1767,7 @@ public class ReplicationControlManagerTest { @@ -1767,7 +1767,7 @@ public class ReplicationControlManagerTest {
replication.handleBrokerFenced(3, fenceRecords);
ctx.replay(fenceRecords);
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4}).setIsr(new int[] {1, 2, 4}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(1).build(), replication.getPartition(fooId, 0));
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build(), replication.getPartition(fooId, 0));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
@ -1804,11 +1804,11 @@ public class ReplicationControlManagerTest { @@ -1804,11 +1804,11 @@ public class ReplicationControlManagerTest {
alterResult.response());
ctx.replay(alterResult.records());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(2).setPartitionEpoch(2).build(), replication.getPartition(fooId, 0));
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(fooId, 0));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 0}).setIsr(new int[] {0, 1, 2}).
setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(fooId, 1));
setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(fooId, 1));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 0}).setIsr(new int[] {4, 2}).
setAddingReplicas(new int[] {0, 1}).setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(barId, 0));
setAddingReplicas(new int[] {0, 1}).setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(barId, 0));
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(asList(new OngoingTopicReassignment().
@ -1829,13 +1829,13 @@ public class ReplicationControlManagerTest { @@ -1829,13 +1829,13 @@ public class ReplicationControlManagerTest {
new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104).
setTopics(asList(new TopicData().setTopicId(barId).setPartitions(asList(
new PartitionData().setPartitionIndex(0).setPartitionEpoch(2).
setLeaderEpoch(1).setNewIsrWithEpochs(isrWithDefaultEpoch(4, 1, 2, 0)))))));
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(4, 1, 2, 0)))))));
assertEquals(new AlterPartitionResponseData().setTopics(asList(
new AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(asList(
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(4).
setLeaderEpoch(1).
setLeaderEpoch(0).
setIsr(asList(4, 1, 2, 0)).
setPartitionEpoch(3).
setErrorCode(NONE.code()))))),
@ -1867,7 +1867,7 @@ public class ReplicationControlManagerTest { @@ -1867,7 +1867,7 @@ public class ReplicationControlManagerTest {
ctx.replay(cancelResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {2, 3, 4}).setIsr(new int[] {4, 2}).
setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(2).setPartitionEpoch(3).build(), replication.getPartition(barId, 0));
setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(3).build(), replication.getPartition(barId, 0));
}
@Test

11
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

@ -173,8 +173,11 @@ public enum MetadataVersion { @@ -173,8 +173,11 @@ public enum MetadataVersion {
// Adds replica epoch to Fetch request (KIP-903).
IBP_3_5_IV1(10, "3.5", "IV1", false),
// Support for SCRAM
IBP_3_5_IV2(11, "3.5", "IV2", true);
// Support for SCRAM
IBP_3_5_IV2(11, "3.5", "IV2", true),
// Remove leader epoch bump when KRaft controller shrinks the ISR (KAFKA-15021)
IBP_3_6_IV0(12, "3.6", "IV0", false);
// NOTE: update the default version in @ClusterTest annotation to point to the latest version
public static final String FEATURE_NAME = "metadata.version";
@ -260,6 +263,10 @@ public enum MetadataVersion { @@ -260,6 +263,10 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_5_IV2);
}
public boolean isSkipLeaderEpochBumpSupported() {
return this.isAtLeast(IBP_3_6_IV0);
}
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}

4
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

@ -67,7 +67,7 @@ public class FeatureCommandTest { @@ -67,7 +67,7 @@ public class FeatureCommandTest {
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
);
assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
"SupportedMaxVersion: 3.6-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
}
@ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
@ -125,7 +125,7 @@ public class FeatureCommandTest { @@ -125,7 +125,7 @@ public class FeatureCommandTest {
"disable", "--feature", "metadata.version"))
);
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
"metadata.version. Local controller 3000 only supports versions 1-11", commandOutput);
"metadata.version. Local controller 3000 only supports versions 1-12", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),

Loading…
Cancel
Save