Browse Source

KAFKA-9212; Ensure LeaderAndIsr state updated in controller context during reassignment (#7795)

KIP-320 improved fetch semantics by adding leader epoch validation. This relies on
reliable propagation of leader epoch information from the controller. Unfortunately, we
have encountered a bug during partition reassignment in which the leader epoch in the
controller context does not get properly updated. This causes UpdateMetadata requests
to be sent with stale epoch information which results in the metadata caches on the
brokers falling out of sync.

This bug has existed for a long time, but it is only a problem due to the new epoch
validation done by the client. Because the client includes the stale leader epoch in its
requests, the leader rejects them, yet the stale metadata cache on the brokers prevents
the consumer from getting the latest epoch. Hence the consumer cannot make progress
while a reassignment is ongoing.

Although it is straightforward to fix this problem in the controller for the new releases
(which this patch does), it is not so easy to fix older brokers which means new clients
could still encounter brokers with this bug. To address this problem, this patch also
modifies the client to treat the leader epoch returned from the Metadata response as
"unreliable" if it comes from an older version of the protocol. The client in this case will
discard the returned epoch and it won't be included in any requests.

Also, note that the correct epoch is still forwarded to replicas correctly in the
LeaderAndIsr request, so this bug does not affect replication.

Reviewers: Jun Rao <junrao@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>
pull/7803/head
Jason Gustafson 5 years ago committed by Ismael Juma
parent
commit
5d0cb1419c
  1. 29
      clients/src/main/java/org/apache/kafka/clients/Metadata.java
  2. 28
      clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
  3. 117
      clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
  4. 14
      core/src/main/scala/kafka/controller/KafkaController.scala
  5. 45
      core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala

29
clients/src/main/java/org/apache/kafka/clients/Metadata.java

@ -298,12 +298,10 @@ public class Metadata implements Closeable {
if (metadata.isInternal()) if (metadata.isInternal())
internalTopics.add(metadata.topic()); internalTopics.add(metadata.topic());
for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
// Even if the partition's metadata includes an error, we need to handle
// Even if the partition's metadata includes an error, we need to handle the update to catch new epochs // the update to catch new epochs
updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> { updatePartitionInfo(metadata.topic(), partitionMetadata,
int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH); metadataResponse.hasReliableLeaderEpochs(), partitions::add);
partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch));
});
if (partitionMetadata.error().exception() instanceof InvalidMetadataException) { if (partitionMetadata.error().exception() instanceof InvalidMetadataException) {
log.debug("Requesting metadata update for partition {} due to error {}", log.debug("Requesting metadata update for partition {} due to error {}",
@ -328,25 +326,26 @@ public class Metadata implements Closeable {
*/ */
private void updatePartitionInfo(String topic, private void updatePartitionInfo(String topic,
MetadataResponse.PartitionMetadata partitionMetadata, MetadataResponse.PartitionMetadata partitionMetadata,
Consumer<PartitionInfo> partitionInfoConsumer) { boolean hasReliableLeaderEpoch,
Consumer<MetadataCache.PartitionInfoAndEpoch> partitionInfoConsumer) {
TopicPartition tp = new TopicPartition(topic, partitionMetadata.partition()); TopicPartition tp = new TopicPartition(topic, partitionMetadata.partition());
if (partitionMetadata.leaderEpoch().isPresent()) {
if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch().isPresent()) {
int newEpoch = partitionMetadata.leaderEpoch().get(); int newEpoch = partitionMetadata.leaderEpoch().get();
// If the received leader epoch is at least the same as the previous one, update the metadata // If the received leader epoch is at least the same as the previous one, update the metadata
if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch, false)) { if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch, false)) {
partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata)); PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info, newEpoch));
} else { } else {
// Otherwise ignore the new metadata and use the previously cached info // Otherwise ignore the new metadata and use the previously cached info
PartitionInfo previousInfo = cache.cluster().partition(tp); cache.getPartitionInfo(tp).ifPresent(partitionInfoConsumer);
if (previousInfo != null) {
partitionInfoConsumer.accept(previousInfo);
}
} }
} else { } else {
// Handle old cluster formats as well as error responses where leader and epoch are missing // Handle old cluster formats as well as error responses where leader and epoch are missing
lastSeenLeaderEpochs.remove(tp); lastSeenLeaderEpochs.remove(tp);
partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata)); PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info,
RecordBatch.NO_PARTITION_LEADER_EPOCH));
} }
} }

28
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java

@ -60,13 +60,25 @@ public class MetadataResponse extends AbstractResponse {
private final MetadataResponseData data; private final MetadataResponseData data;
private volatile Holder holder; private volatile Holder holder;
private final boolean hasReliableLeaderEpochs;
public MetadataResponse(MetadataResponseData data) { public MetadataResponse(MetadataResponseData data) {
this.data = data; this(data, true);
} }
public MetadataResponse(Struct struct, short version) { public MetadataResponse(Struct struct, short version) {
this(new MetadataResponseData(struct, version)); // Prior to Kafka version 2.4 (which coincides with Metadata version 9), the broker
// does not propagate leader epoch information accurately while a reassignment is in
// progress. Relying on a stale epoch can lead to FENCED_LEADER_EPOCH errors which
// can prevent consumption throughout the course of a reassignment. It is safer in
// this case to revert to the behavior in previous protocol versions which checks
// leader status only.
this(new MetadataResponseData(struct, version), version >= 9);
}
private MetadataResponse(MetadataResponseData data, boolean hasReliableLeaderEpochs) {
this.data = data;
this.hasReliableLeaderEpochs = hasReliableLeaderEpochs;
} }
@Override @Override
@ -205,6 +217,18 @@ public class MetadataResponse extends AbstractResponse {
return this.data.clusterId(); return this.data.clusterId();
} }
/**
* Check whether the leader epochs returned from the response can be relied on
* for epoch validation in Fetch, ListOffsets, and OffsetsForLeaderEpoch requests.
* If not, then the client will not retain the leader epochs and hence will not
* forward them in requests.
*
* @return true if the epoch can be used for validation
*/
public boolean hasReliableLeaderEpochs() {
return hasReliableLeaderEpochs;
}
public static MetadataResponse parse(ByteBuffer buffer, short version) { public static MetadataResponse parse(ByteBuffer buffer, short version) {
return new MetadataResponse(ApiKeys.METADATA.responseSchema(version).read(buffer), version); return new MetadataResponse(ApiKeys.METADATA.responseSchema(version).read(buffer), version);
} }

117
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java

@ -23,7 +23,14 @@ import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBrokerCollection;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopicCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
@ -33,9 +40,13 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.assertOptional; import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -147,6 +158,112 @@ public class MetadataTest {
assertEquals(0, metadata.timeToNextUpdate(now + 1)); assertEquals(0, metadata.timeToNextUpdate(now + 1));
} }
/**
* Prior to Kafka version 2.4 (which coincides with Metadata version 9), the broker does not propagate leader epoch
* information accurately while a reassignment is in progress, so we cannot rely on it. This is explained in more
* detail in MetadataResponse's constructor.
*/
@Test
public void testIgnoreLeaderEpochInOlderMetadataResponse() {
TopicPartition tp = new TopicPartition("topic", 0);
MetadataResponsePartition partitionMetadata = new MetadataResponsePartition()
.setPartitionIndex(tp.partition())
.setLeaderId(5)
.setLeaderEpoch(10)
.setReplicaNodes(Arrays.asList(1, 2, 3))
.setIsrNodes(Arrays.asList(1, 2, 3))
.setOfflineReplicas(Collections.emptyList())
.setErrorCode(Errors.NONE.code());
MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
.setName(tp.topic())
.setErrorCode(Errors.NONE.code())
.setPartitions(Collections.singletonList(partitionMetadata))
.setIsInternal(false);
MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
topics.add(topicMetadata);
MetadataResponseData data = new MetadataResponseData()
.setClusterId("clusterId")
.setControllerId(0)
.setTopics(topics)
.setBrokers(new MetadataResponseBrokerCollection());
for (short version = ApiKeys.METADATA.oldestVersion(); version < 9; version++) {
Struct struct = data.toStruct(version);
MetadataResponse response = new MetadataResponse(struct, version);
assertFalse(response.hasReliableLeaderEpochs());
metadata.update(response, 100);
assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
assertEquals(-1, info.epoch());
}
for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
Struct struct = data.toStruct(version);
MetadataResponse response = new MetadataResponse(struct, version);
assertTrue(response.hasReliableLeaderEpochs());
metadata.update(response, 100);
assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
assertEquals(10, info.epoch());
}
}
@Test
public void testStaleMetadata() {
TopicPartition tp = new TopicPartition("topic", 0);
MetadataResponsePartition partitionMetadata = new MetadataResponsePartition()
.setPartitionIndex(tp.partition())
.setLeaderId(1)
.setLeaderEpoch(10)
.setReplicaNodes(Arrays.asList(1, 2, 3))
.setIsrNodes(Arrays.asList(1, 2, 3))
.setOfflineReplicas(Collections.emptyList())
.setErrorCode(Errors.NONE.code());
MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
.setName(tp.topic())
.setErrorCode(Errors.NONE.code())
.setPartitions(Collections.singletonList(partitionMetadata))
.setIsInternal(false);
MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
topics.add(topicMetadata);
MetadataResponseData data = new MetadataResponseData()
.setClusterId("clusterId")
.setControllerId(0)
.setTopics(topics)
.setBrokers(new MetadataResponseBrokerCollection());
metadata.update(new MetadataResponse(data), 100);
// Older epoch with changed ISR should be ignored
partitionMetadata
.setPartitionIndex(tp.partition())
.setLeaderId(1)
.setLeaderEpoch(9)
.setReplicaNodes(Arrays.asList(1, 2, 3))
.setIsrNodes(Arrays.asList(1, 2))
.setOfflineReplicas(Collections.emptyList())
.setErrorCode(Errors.NONE.code());
metadata.update(new MetadataResponse(data), 101);
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
List<Integer> cachedIsr = Arrays.stream(info.partitionInfo().inSyncReplicas())
.map(Node::id).collect(Collectors.toList());
assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
assertEquals(10, info.epoch());
}
@Test @Test
public void testFailedUpdate() { public void testFailedUpdate() {
long time = 100; long time = 100;

14
core/src/main/scala/kafka/controller/KafkaController.scala

@ -1081,14 +1081,16 @@ class KafkaController(val config: KafkaConfig,
val UpdateLeaderAndIsrResult(finishedUpdates, _) = val UpdateLeaderAndIsrResult(finishedUpdates, _) =
zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion) zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
finishedUpdates.headOption.map { finishedUpdates.get(partition) match {
case (partition, Right(leaderAndIsr)) => case Some(Right(leaderAndIsr)) =>
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch)
info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}") info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}")
true true
case (_, Left(e)) => case Some(Left(e)) => throw e
throw e case None => false
}.getOrElse(false) }
case None => case None =>
throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " + throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " +
"leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist") "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist")

45
core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala

@ -750,6 +750,41 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
assertEquals(Seq(101), zkClient.getReplicasForPartition(tp0)) assertEquals(Seq(101), zkClient.getReplicasForPartition(tp0))
} }
@Test
def testProduceAndConsumeWithReassignmentInProgress(): Unit = {
startBrokers(Seq(100, 101))
adminClient = createAdminClient(servers)
createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100)), servers = servers)
produceMessages(tp0.topic, 500, acks = -1, valueLength = 100 * 1000)
TestUtils.throttleAllBrokersReplication(adminClient, Seq(101), throttleBytes = 1)
TestUtils.assignThrottledPartitionReplicas(adminClient, Map(tp0 -> Seq(101)))
adminClient.alterPartitionReassignments(
Map(reassignmentEntry(tp0, Seq(100, 101))).asJava
).all().get()
awaitReassignmentInProgress(tp0)
produceMessages(tp0.topic, 500, acks = -1, valueLength = 64)
val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers))
try {
consumer.assign(Seq(tp0).asJava)
pollUntilAtLeastNumRecords(consumer, numRecords = 1000)
} finally {
consumer.close()
}
assertTrue(isAssignmentInProgress(tp0))
TestUtils.resetBrokersThrottle(adminClient, Seq(101))
TestUtils.removePartitionReplicaThrottles(adminClient, Set(tp0))
waitForAllReassignmentsToComplete()
assertEquals(Seq(100, 101), zkClient.getReplicasForPartition(tp0))
}
@Test @Test
def shouldListMovingPartitionsThroughApi(): Unit = { def shouldListMovingPartitionsThroughApi(): Unit = {
startBrokers(Seq(100, 101)) startBrokers(Seq(100, 101))
@ -1228,6 +1263,16 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
s"Znode ${ReassignPartitionsZNode.path} wasn't deleted", pause = pause) s"Znode ${ReassignPartitionsZNode.path} wasn't deleted", pause = pause)
} }
def awaitReassignmentInProgress(topicPartition: TopicPartition): Unit = {
waitUntilTrue(() => isAssignmentInProgress(topicPartition),
"Timed out waiting for expected reassignment to begin")
}
def isAssignmentInProgress(topicPartition: TopicPartition): Boolean = {
val reassignments = adminClient.listPartitionReassignments().reassignments().get()
reassignments.asScala.get(topicPartition).isDefined
}
def waitForAllReassignmentsToComplete(pause: Long = 100L): Unit = { def waitForAllReassignmentsToComplete(pause: Long = 100L): Unit = {
waitUntilTrue(() => adminClient.listPartitionReassignments().reassignments().get().isEmpty, waitUntilTrue(() => adminClient.listPartitionReassignments().reassignments().get().isEmpty,
s"There still are ongoing reassignments", pause = pause) s"There still are ongoing reassignments", pause = pause)

Loading…
Cancel
Save