From 5d0cb1419cd1f1cdfb7bc04ed4760d5a0eae0aa1 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Sun, 8 Dec 2019 11:38:50 -0800 Subject: [PATCH] KAFKA-9212; Ensure LeaderAndIsr state updated in controller context during reassignment (#7795) KIP-320 improved fetch semantics by adding leader epoch validation. This relies on reliable propagation of leader epoch information from the controller. Unfortunately, we have encountered a bug during partition reassignment in which the leader epoch in the controller context does not get properly updated. This causes UpdateMetadata requests to be sent with stale epoch information which results in the metadata caches on the brokers falling out of sync. This bug has existed for a long time, but it is only a problem due to the new epoch validation done by the client. Because the client includes the stale leader epoch in its requests, the leader rejects them, yet the stale metadata cache on the brokers prevents the consumer from getting the latest epoch. Hence the consumer cannot make progress while a reassignment is ongoing. Although it is straightforward to fix this problem in the controller for the new releases (which this patch does), it is not so easy to fix older brokers which means new clients could still encounter brokers with this bug. To address this problem, this patch also modifies the client to treat the leader epoch returned from the Metadata response as "unreliable" if it comes from an older version of the protocol. The client in this case will discard the returned epoch and it won't be included in any requests. Also, note that the correct epoch is still forwarded to replicas correctly in the LeaderAndIsr request, so this bug does not affect replication. Reviewers: Jun Rao , Stanislav Kozlovski , Ismael Juma --- .../org/apache/kafka/clients/Metadata.java | 29 +++-- .../common/requests/MetadataResponse.java | 28 ++++- .../apache/kafka/clients/MetadataTest.java | 117 ++++++++++++++++++ .../kafka/controller/KafkaController.scala | 14 ++- .../admin/ReassignPartitionsClusterTest.scala | 45 +++++++ 5 files changed, 210 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index a9e68b861fe..82c1b07e2ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -298,12 +298,10 @@ public class Metadata implements Closeable { if (metadata.isInternal()) internalTopics.add(metadata.topic()); for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { - - // Even if the partition's metadata includes an error, we need to handle the update to catch new epochs - updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> { - int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH); - partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch)); - }); + // Even if the partition's metadata includes an error, we need to handle + // the update to catch new epochs + updatePartitionInfo(metadata.topic(), partitionMetadata, + metadataResponse.hasReliableLeaderEpochs(), partitions::add); if (partitionMetadata.error().exception() instanceof InvalidMetadataException) { log.debug("Requesting metadata update for partition {} due to error {}", @@ -328,25 +326,26 @@ public class Metadata implements Closeable { */ private void updatePartitionInfo(String topic, MetadataResponse.PartitionMetadata partitionMetadata, - Consumer partitionInfoConsumer) { - + boolean hasReliableLeaderEpoch, + Consumer partitionInfoConsumer) { TopicPartition tp = new TopicPartition(topic, partitionMetadata.partition()); - if (partitionMetadata.leaderEpoch().isPresent()) { + + if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch().isPresent()) { int newEpoch = partitionMetadata.leaderEpoch().get(); // If the received leader epoch is at least the same as the previous one, update the metadata if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch, false)) { - partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata)); + PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata); + partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info, newEpoch)); } else { // Otherwise ignore the new metadata and use the previously cached info - PartitionInfo previousInfo = cache.cluster().partition(tp); - if (previousInfo != null) { - partitionInfoConsumer.accept(previousInfo); - } + cache.getPartitionInfo(tp).ifPresent(partitionInfoConsumer); } } else { // Handle old cluster formats as well as error responses where leader and epoch are missing lastSeenLeaderEpochs.remove(tp); - partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata)); + PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata); + partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info, + RecordBatch.NO_PARTITION_LEADER_EPOCH)); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 9b263b1e72d..e4e09a52705 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -60,13 +60,25 @@ public class MetadataResponse extends AbstractResponse { private final MetadataResponseData data; private volatile Holder holder; + private final boolean hasReliableLeaderEpochs; public MetadataResponse(MetadataResponseData data) { - this.data = data; + this(data, true); } public MetadataResponse(Struct struct, short version) { - this(new MetadataResponseData(struct, version)); + // Prior to Kafka version 2.4 (which coincides with Metadata version 9), the broker + // does not propagate leader epoch information accurately while a reassignment is in + // progress. Relying on a stale epoch can lead to FENCED_LEADER_EPOCH errors which + // can prevent consumption throughout the course of a reassignment. It is safer in + // this case to revert to the behavior in previous protocol versions which checks + // leader status only. + this(new MetadataResponseData(struct, version), version >= 9); + } + + private MetadataResponse(MetadataResponseData data, boolean hasReliableLeaderEpochs) { + this.data = data; + this.hasReliableLeaderEpochs = hasReliableLeaderEpochs; } @Override @@ -205,6 +217,18 @@ public class MetadataResponse extends AbstractResponse { return this.data.clusterId(); } + /** + * Check whether the leader epochs returned from the response can be relied on + * for epoch validation in Fetch, ListOffsets, and OffsetsForLeaderEpoch requests. + * If not, then the client will not retain the leader epochs and hence will not + * forward them in requests. + * + * @return true if the epoch can be used for validation + */ + public boolean hasReliableLeaderEpochs() { + return hasReliableLeaderEpochs; + } + public static MetadataResponse parse(ByteBuffer buffer, short version) { return new MetadataResponse(ApiKeys.METADATA.responseSchema(version).read(buffer), version); } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 4517f766642..7067e882bcb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -23,7 +23,14 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBrokerCollection; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopicCollection; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -33,9 +40,13 @@ import org.apache.kafka.test.TestUtils; import org.junit.Test; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.kafka.test.TestUtils.assertOptional; import static org.junit.Assert.assertEquals; @@ -147,6 +158,112 @@ public class MetadataTest { assertEquals(0, metadata.timeToNextUpdate(now + 1)); } + /** + * Prior to Kafka version 2.4 (which coincides with Metadata version 9), the broker does not propagate leader epoch + * information accurately while a reassignment is in progress, so we cannot rely on it. This is explained in more + * detail in MetadataResponse's constructor. + */ + @Test + public void testIgnoreLeaderEpochInOlderMetadataResponse() { + TopicPartition tp = new TopicPartition("topic", 0); + + MetadataResponsePartition partitionMetadata = new MetadataResponsePartition() + .setPartitionIndex(tp.partition()) + .setLeaderId(5) + .setLeaderEpoch(10) + .setReplicaNodes(Arrays.asList(1, 2, 3)) + .setIsrNodes(Arrays.asList(1, 2, 3)) + .setOfflineReplicas(Collections.emptyList()) + .setErrorCode(Errors.NONE.code()); + + MetadataResponseTopic topicMetadata = new MetadataResponseTopic() + .setName(tp.topic()) + .setErrorCode(Errors.NONE.code()) + .setPartitions(Collections.singletonList(partitionMetadata)) + .setIsInternal(false); + + MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection(); + topics.add(topicMetadata); + + MetadataResponseData data = new MetadataResponseData() + .setClusterId("clusterId") + .setControllerId(0) + .setTopics(topics) + .setBrokers(new MetadataResponseBrokerCollection()); + + for (short version = ApiKeys.METADATA.oldestVersion(); version < 9; version++) { + Struct struct = data.toStruct(version); + MetadataResponse response = new MetadataResponse(struct, version); + assertFalse(response.hasReliableLeaderEpochs()); + metadata.update(response, 100); + assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent()); + MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get(); + assertEquals(-1, info.epoch()); + } + + for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) { + Struct struct = data.toStruct(version); + MetadataResponse response = new MetadataResponse(struct, version); + assertTrue(response.hasReliableLeaderEpochs()); + metadata.update(response, 100); + assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent()); + MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get(); + assertEquals(10, info.epoch()); + } + } + + @Test + public void testStaleMetadata() { + TopicPartition tp = new TopicPartition("topic", 0); + + MetadataResponsePartition partitionMetadata = new MetadataResponsePartition() + .setPartitionIndex(tp.partition()) + .setLeaderId(1) + .setLeaderEpoch(10) + .setReplicaNodes(Arrays.asList(1, 2, 3)) + .setIsrNodes(Arrays.asList(1, 2, 3)) + .setOfflineReplicas(Collections.emptyList()) + .setErrorCode(Errors.NONE.code()); + + MetadataResponseTopic topicMetadata = new MetadataResponseTopic() + .setName(tp.topic()) + .setErrorCode(Errors.NONE.code()) + .setPartitions(Collections.singletonList(partitionMetadata)) + .setIsInternal(false); + + MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection(); + topics.add(topicMetadata); + + MetadataResponseData data = new MetadataResponseData() + .setClusterId("clusterId") + .setControllerId(0) + .setTopics(topics) + .setBrokers(new MetadataResponseBrokerCollection()); + + metadata.update(new MetadataResponse(data), 100); + + // Older epoch with changed ISR should be ignored + partitionMetadata + .setPartitionIndex(tp.partition()) + .setLeaderId(1) + .setLeaderEpoch(9) + .setReplicaNodes(Arrays.asList(1, 2, 3)) + .setIsrNodes(Arrays.asList(1, 2)) + .setOfflineReplicas(Collections.emptyList()) + .setErrorCode(Errors.NONE.code()); + + metadata.update(new MetadataResponse(data), 101); + assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); + + assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent()); + MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get(); + + List cachedIsr = Arrays.stream(info.partitionInfo().inSyncReplicas()) + .map(Node::id).collect(Collectors.toList()); + assertEquals(Arrays.asList(1, 2, 3), cachedIsr); + assertEquals(10, info.epoch()); + } + @Test public void testFailedUpdate() { long time = 100; diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 33bb00b48a0..444e74d1505 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1081,14 +1081,16 @@ class KafkaController(val config: KafkaConfig, val UpdateLeaderAndIsrResult(finishedUpdates, _) = zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion) - finishedUpdates.headOption.map { - case (partition, Right(leaderAndIsr)) => - finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)) + finishedUpdates.get(partition) match { + case Some(Right(leaderAndIsr)) => + val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, epoch) + controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) + finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch) info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}") true - case (_, Left(e)) => - throw e - }.getOrElse(false) + case Some(Left(e)) => throw e + case None => false + } case None => throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " + "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist") diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 72e1408a861..d71db70c16d 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -750,6 +750,41 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { assertEquals(Seq(101), zkClient.getReplicasForPartition(tp0)) } + @Test + def testProduceAndConsumeWithReassignmentInProgress(): Unit = { + startBrokers(Seq(100, 101)) + adminClient = createAdminClient(servers) + createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100)), servers = servers) + + produceMessages(tp0.topic, 500, acks = -1, valueLength = 100 * 1000) + + TestUtils.throttleAllBrokersReplication(adminClient, Seq(101), throttleBytes = 1) + TestUtils.assignThrottledPartitionReplicas(adminClient, Map(tp0 -> Seq(101))) + + adminClient.alterPartitionReassignments( + Map(reassignmentEntry(tp0, Seq(100, 101))).asJava + ).all().get() + + awaitReassignmentInProgress(tp0) + + produceMessages(tp0.topic, 500, acks = -1, valueLength = 64) + val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers)) + try { + consumer.assign(Seq(tp0).asJava) + pollUntilAtLeastNumRecords(consumer, numRecords = 1000) + } finally { + consumer.close() + } + + assertTrue(isAssignmentInProgress(tp0)) + + TestUtils.resetBrokersThrottle(adminClient, Seq(101)) + TestUtils.removePartitionReplicaThrottles(adminClient, Set(tp0)) + + waitForAllReassignmentsToComplete() + assertEquals(Seq(100, 101), zkClient.getReplicasForPartition(tp0)) + } + @Test def shouldListMovingPartitionsThroughApi(): Unit = { startBrokers(Seq(100, 101)) @@ -1228,6 +1263,16 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { s"Znode ${ReassignPartitionsZNode.path} wasn't deleted", pause = pause) } + def awaitReassignmentInProgress(topicPartition: TopicPartition): Unit = { + waitUntilTrue(() => isAssignmentInProgress(topicPartition), + "Timed out waiting for expected reassignment to begin") + } + + def isAssignmentInProgress(topicPartition: TopicPartition): Boolean = { + val reassignments = adminClient.listPartitionReassignments().reassignments().get() + reassignments.asScala.get(topicPartition).isDefined + } + def waitForAllReassignmentsToComplete(pause: Long = 100L): Unit = { waitUntilTrue(() => adminClient.listPartitionReassignments().reassignments().get().isEmpty, s"There still are ongoing reassignments", pause = pause)