diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index c5c6d04e60a..cedbe15f092 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -41,7 +41,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.function.Predicate; import java.util.function.Supplier; import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH; @@ -156,14 +155,35 @@ public class Metadata implements Closeable { } /** - * Request an update for the partition metadata iff the given leader epoch is newer than the last seen leader epoch + * Request an update for the partition metadata iff we have seen a newer leader epoch. This is called by the client + * any time it handles a response from the broker that includes leader epoch, except for UpdateMetadata which + * follows a different code path ({@link #update}). + * + * @param topicPartition + * @param leaderEpoch + * @return true if we updated the last seen epoch, false otherwise */ public synchronized boolean updateLastSeenEpochIfNewer(TopicPartition topicPartition, int leaderEpoch) { Objects.requireNonNull(topicPartition, "TopicPartition cannot be null"); if (leaderEpoch < 0) throw new IllegalArgumentException("Invalid leader epoch " + leaderEpoch + " (must be non-negative)"); - boolean updated = updateLastSeenEpoch(topicPartition, leaderEpoch, oldEpoch -> leaderEpoch > oldEpoch); + Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition); + log.trace("Determining if we should replace existing epoch {} with new epoch {} for partition {}", oldEpoch, leaderEpoch, topicPartition); + + final boolean updated; + if (oldEpoch == null) { + log.debug("Not replacing null epoch with new epoch {} for partition {}", leaderEpoch, topicPartition); + updated = false; + } else if (leaderEpoch > oldEpoch) { + log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch, leaderEpoch, topicPartition); + lastSeenLeaderEpochs.put(topicPartition, leaderEpoch); + updated = true; + } else { + log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, leaderEpoch, topicPartition); + updated = false; + } + this.needFullUpdate = this.needFullUpdate || updated; return updated; } @@ -172,29 +192,6 @@ public class Metadata implements Closeable { return Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition)); } - /** - * Conditionally update the leader epoch for a partition - * - * @param topicPartition topic+partition to update the epoch for - * @param epoch the new epoch - * @param epochTest a predicate to determine if the old epoch should be replaced - * @return true if the epoch was updated, false otherwise - */ - private synchronized boolean updateLastSeenEpoch(TopicPartition topicPartition, - int epoch, - Predicate epochTest) { - Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition); - log.trace("Determining if we should replace existing epoch {} with new epoch {}", oldEpoch, epoch); - if (oldEpoch == null || epochTest.test(oldEpoch)) { - log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch, epoch, topicPartition); - lastSeenLeaderEpochs.put(topicPartition, epoch); - return true; - } else { - log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, epoch, topicPartition); - return false; - } - } - /** * Check whether an update has been explicitly requested. * @@ -373,10 +370,14 @@ public class Metadata implements Closeable { if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); // If the received leader epoch is at least the same as the previous one, update the metadata - if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch)) { + Integer currentEpoch = lastSeenLeaderEpochs.get(tp); + if (currentEpoch == null || newEpoch >= currentEpoch) { + log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); + lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); } else { // Otherwise ignore the new metadata and use the previously cached info + log.debug("Got metadata for an older epoch {} (current is {}) for partition {}, not updating", newEpoch, currentEpoch, tp); return cache.partitionMetadata(tp); } } else { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 68c7347e20c..c3f10bac8c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -486,7 +486,7 @@ public class Fetcher implements Closeable { // Validate each partition against the current leader and epoch subscriptions.assignedPartitions().forEach(topicPartition -> { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition); - subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch); + subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch); }); // Collect positions needing validation, with backoff @@ -756,7 +756,7 @@ public class Fetcher implements Closeable { } } - private boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) { + static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) { ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH); if (apiVersion == null) return false; @@ -1101,8 +1101,9 @@ public class Fetcher implements Closeable { Map fetchable = new LinkedHashMap<>(); // Ensure the position has an up-to-date leader - subscriptions.assignedPartitions().forEach( - tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.currentLeader(tp))); + subscriptions.assignedPartitions().forEach(tp -> + subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp, metadata.currentLeader(tp)) + ); long currentTimeMs = time.milliseconds(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 5b375da5a4f..4d93a1b4fcb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -45,6 +47,8 @@ import java.util.regex.Pattern; import java.util.stream.Collector; import java.util.stream.Collectors; +import static org.apache.kafka.clients.consumer.internals.Fetcher.hasUsableOffsetForLeaderEpochVersion; + /** * A class for tracking the topics, partitions, and offsets for the consumer. A partition * is "assigned" either directly with {@link #assignFromUser(Set)} (manual assignment) @@ -422,8 +426,29 @@ public class SubscriptionState { assignedState(tp).position(position); } - public synchronized boolean maybeValidatePositionForCurrentLeader(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) { - return assignedState(tp).maybeValidatePosition(leaderAndEpoch); + /** + * Enter the offset validation state if the leader for this partition is known to support a usable version of the + * OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete the offset validation. + * + * @param apiVersions + * @param tp + * @param leaderAndEpoch + * @return true if we enter the offset validation state + */ + public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition tp, + Metadata.LeaderAndEpoch leaderAndEpoch) { + if (leaderAndEpoch.leader.isPresent()) { + NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString()); + if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { + return assignedState(tp).maybeValidatePosition(leaderAndEpoch); + } else { + // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation + completeValidation(tp); + return false; + } + } else { + return assignedState(tp).maybeValidatePosition(leaderAndEpoch); + } } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 5203745d11e..96cd22c4111 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -313,6 +313,10 @@ public class MetadataTest { boolean[] updateResult = {true, false, false, false, false, true, false, false, false, true}; TopicPartition tp = new TopicPartition("topic", 0); + MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, + Collections.emptyMap(), Collections.singletonMap("topic", 1), _tp -> 0); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L); + for (int i = 0; i < epochs.length; i++) { metadata.updateLastSeenEpochIfNewer(tp, epochs[i]); if (updateResult[i]) { @@ -325,6 +329,46 @@ public class MetadataTest { } } + @Test + public void testUpdateLastEpoch() { + TopicPartition tp = new TopicPartition("topic-1", 0); + + MetadataResponse metadataResponse = emptyMetadataResponse(); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); + + // if we have no leader epoch, this call shouldn't do anything + assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0)); + assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1)); + assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 2)); + assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent()); + + // Metadata with newer epoch is handled + metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + + // Don't update to an older one + assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + + // Don't cause update if it's the same one + assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 10)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + + // Update if we see newer epoch + assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 12)); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); + + metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); + + // Don't overwrite metadata with older epoch + metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 11); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); + } + @Test public void testRejectOldMetadata() { Map partitionCounts = new HashMap<>(); @@ -378,26 +422,6 @@ public class MetadataTest { } } - @Test - public void testMaybeRequestUpdate() { - TopicPartition tp = new TopicPartition("topic-1", 0); - metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L); - assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 1)); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1); - - metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 1L); - assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1)); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1); - - metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 2L); - assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0)); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1); - - metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 3L); - assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 2)); - assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 2); - } - @Test public void testOutOfBandEpochUpdate() { Map partitionCounts = new HashMap<>(); @@ -406,7 +430,7 @@ public class MetadataTest { metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L); - assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 99)); + assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 99)); // Update epoch to 100 MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100); @@ -414,7 +438,7 @@ public class MetadataTest { assertNotNull(metadata.fetch().partition(tp)); assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100); - // Simulate a leader epoch from another response, like a fetch response (not yet implemented) + // Simulate a leader epoch from another response, like a fetch response or list offsets assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 101)); // Cache of partition stays, but current partition info is not available since it's stale @@ -454,6 +478,11 @@ public class MetadataTest { assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent()); assertEquals(0, metadata.partitionMetadataIfCurrent(tp).get().partition()); assertEquals(Optional.of(0), metadata.partitionMetadataIfCurrent(tp).get().leaderId); + + // Since epoch was null, this shouldn't update it + metadata.updateLastSeenEpochIfNewer(tp, 10); + assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent()); + assertFalse(metadata.partitionMetadataIfCurrent(tp).get().leaderEpoch.isPresent()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index f395b730c52..24ebafa84cc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1593,6 +1593,53 @@ public class FetcherTest { assertEquals(5, subscriptions.position(tp0).offset); } + @Test + public void testListOffsetNoUpdateMissingEpoch() { + buildFetcher(); + + // Set up metadata with no leader epoch + subscriptions.assignFromUser(singleton(tp0)); + MetadataResponse metadataWithNoLeaderEpochs = TestUtils.metadataUpdateWith( + "kafka-cluster", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp -> null); + client.updateMetadata(metadataWithNoLeaderEpochs); + + // Return a ListOffsets response with leaderEpoch=1, we should ignore it + subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), + listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 1)); + fetcher.resetOffsetsIfNeeded(); + consumerClient.pollNoWakeup(); + + // Reset should be satisfied and no metadata update requested + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertFalse(metadata.updateRequested()); + assertFalse(metadata.lastSeenLeaderEpoch(tp0).isPresent()); + } + + @Test + public void testListOffsetUpdateEpoch() { + buildFetcher(); + + // Set up metadata with leaderEpoch=1 + subscriptions.assignFromUser(singleton(tp0)); + MetadataResponse metadataWithLeaderEpochs = TestUtils.metadataUpdateWith( + "kafka-cluster", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp -> 1); + client.updateMetadata(metadataWithLeaderEpochs); + + // Reset offsets to trigger ListOffsets call + subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST); + + // Now we see a ListOffsets with leaderEpoch=2 epoch, we trigger a metadata update + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, 1), + listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 2)); + fetcher.resetOffsetsIfNeeded(); + consumerClient.pollNoWakeup(); + + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertTrue(metadata.updateRequested()); + assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch -> assertEquals((long) epoch, 2)); + } + @Test public void testUpdateFetchPositionDisconnect() { buildFetcher(); @@ -3673,18 +3720,35 @@ public class FetcherTest { apiVersions.update(node.idString(), NodeApiVersions.create( ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2)); - // Seek with a position and leader+epoch - Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch( - metadata.currentLeader(tp0).leader, Optional.of(epochOne)); - subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); + { + // Seek with a position and leader+epoch + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch( + metadata.currentLeader(tp0).leader, Optional.of(epochOne)); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); - // Update metadata to epoch=2, enter validation - metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, - Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L); - fetcher.validateOffsetsIfNeeded(); + // Update metadata to epoch=2, enter validation + metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, + Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L); + fetcher.validateOffsetsIfNeeded(); - // Offset validation is skipped - assertFalse(subscriptions.awaitingValidation(tp0)); + // Offset validation is skipped + assertFalse(subscriptions.awaitingValidation(tp0)); + } + + { + // Seek with a position and leader+epoch + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch( + metadata.currentLeader(tp0).leader, Optional.of(epochOne)); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); + + // Update metadata to epoch=2, enter validation + metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, + Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L); + + // Subscription should not stay in AWAITING_VALIDATION in prepareFetchRequest + assertEquals(1, fetcher.sendFetches()); + assertFalse(subscriptions.awaitingValidation(tp0)); + } } @Test @@ -3764,7 +3828,7 @@ public class FetcherTest { Optional.of(epochTwo), new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochTwo))); subscriptions.position(tp0, nextPosition); - subscriptions.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree))); + subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree))); // Prepare offset list response from async validation with epoch=2 Map endOffsetMap = new HashMap<>(); @@ -4065,13 +4129,27 @@ public class FetcherTest { }; } + private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, final int leaderEpoch) { + // matches any list offset request with the provided timestamp + return body -> { + ListOffsetRequest req = (ListOffsetRequest) body; + return req.partitionTimestamps().equals(Collections.singletonMap( + tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.of(leaderEpoch)))); + }; + } + private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) { return listOffsetResponse(tp0, error, timestamp, offset); } private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) { - ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp, offset, - Optional.empty()); + return listOffsetResponse(tp, error, timestamp, offset, null); + } + + private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset, + Integer leaderEpoch) { + ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData( + error, timestamp, offset, Optional.ofNullable(leaderEpoch)); Map allPartitionData = new HashMap<>(); allPartitionData.put(tp, partitionData); return new ListOffsetResponse(allPartitionData); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 96f08f52acb..cc74652abe8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -16,12 +16,15 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.EpochEndOffset; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; @@ -382,13 +385,15 @@ public class SubscriptionStateTest { new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5)))); assertTrue(state.hasValidPosition(tp0)); assertFalse(state.awaitingValidation(tp0)); + ApiVersions apiVersions = new ApiVersions(); + apiVersions.update(broker1.idString(), NodeApiVersions.create()); - assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch( + assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch( Optional.of(broker1), Optional.empty()))); assertTrue(state.hasValidPosition(tp0)); assertFalse(state.awaitingValidation(tp0)); - assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch( + assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch( Optional.of(broker1), Optional.of(10)))); assertTrue(state.hasValidPosition(tp0)); assertFalse(state.awaitingValidation(tp0)); @@ -414,6 +419,9 @@ public class SubscriptionStateTest { @Test public void testSeekUnvalidatedWithOffsetEpoch() { Node broker1 = new Node(1, "localhost", 9092); + ApiVersions apiVersions = new ApiVersions(); + apiVersions.update(broker1.idString(), NodeApiVersions.create()); + state.assignFromUser(Collections.singleton(tp0)); state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2), @@ -422,19 +430,19 @@ public class SubscriptionStateTest { assertTrue(state.awaitingValidation(tp0)); // Update using the current leader and epoch - assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch( + assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch( Optional.of(broker1), Optional.of(5)))); assertFalse(state.hasValidPosition(tp0)); assertTrue(state.awaitingValidation(tp0)); // Update with a newer leader and epoch - assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch( + assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch( Optional.of(broker1), Optional.of(15)))); assertFalse(state.hasValidPosition(tp0)); assertTrue(state.awaitingValidation(tp0)); // If the updated leader has no epoch information, then skip validation and begin fetching - assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch( + assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch( Optional.of(broker1), Optional.empty()))); assertTrue(state.hasValidPosition(tp0)); assertFalse(state.awaitingValidation(tp0)); @@ -510,6 +518,34 @@ public class SubscriptionStateTest { assertEquals(initialPosition, state.position(tp0)); } + @Test + public void testMaybeValidatePositionForCurrentLeader() { + NodeApiVersions oldApis = NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2); + ApiVersions apiVersions = new ApiVersions(); + apiVersions.update("1", oldApis); + + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), + new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10)))); + + // if API is too old to be usable, we just skip validation + assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch( + Optional.of(broker1), Optional.of(10)))); + assertTrue(state.hasValidPosition(tp0)); + + // New API + apiVersions.update("1", NodeApiVersions.create()); + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), + new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10)))); + + // API is too old to be usable, we just skip validation + assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch( + Optional.of(broker1), Optional.of(10)))); + assertFalse(state.hasValidPosition(tp0)); + } + @Test public void testMaybeCompleteValidationAfterPositionChange() { Node broker1 = new Node(1, "localhost", 9092);