Browse Source

KAFKA-9724 Newer clients not always sending fetch request to older brokers (#8376)

Newer clients were getting stuck entering the validation phase even when a broker didn't support it. This commit will bypass the AWAITING_VALIDATION state when the broker is on an older version of the OffsetsForLeaderEpoch RPC.
pull/7308/merge
David Arthur 5 years ago committed by GitHub
parent
commit
1672a75e1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      clients/src/main/java/org/apache/kafka/clients/Metadata.java
  2. 9
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  3. 29
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  4. 73
      clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
  5. 104
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  6. 46
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java

55
clients/src/main/java/org/apache/kafka/clients/Metadata.java

@ -41,7 +41,6 @@ import java.util.Map; @@ -41,7 +41,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
@ -156,14 +155,35 @@ public class Metadata implements Closeable { @@ -156,14 +155,35 @@ public class Metadata implements Closeable {
}
/**
* Request an update for the partition metadata iff the given leader epoch is newer than the last seen leader epoch
* Request an update for the partition metadata iff we have seen a newer leader epoch. This is called by the client
* any time it handles a response from the broker that includes leader epoch, except for UpdateMetadata which
* follows a different code path ({@link #update}).
*
* @param topicPartition
* @param leaderEpoch
* @return true if we updated the last seen epoch, false otherwise
*/
public synchronized boolean updateLastSeenEpochIfNewer(TopicPartition topicPartition, int leaderEpoch) {
Objects.requireNonNull(topicPartition, "TopicPartition cannot be null");
if (leaderEpoch < 0)
throw new IllegalArgumentException("Invalid leader epoch " + leaderEpoch + " (must be non-negative)");
boolean updated = updateLastSeenEpoch(topicPartition, leaderEpoch, oldEpoch -> leaderEpoch > oldEpoch);
Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
log.trace("Determining if we should replace existing epoch {} with new epoch {} for partition {}", oldEpoch, leaderEpoch, topicPartition);
final boolean updated;
if (oldEpoch == null) {
log.debug("Not replacing null epoch with new epoch {} for partition {}", leaderEpoch, topicPartition);
updated = false;
} else if (leaderEpoch > oldEpoch) {
log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch, leaderEpoch, topicPartition);
lastSeenLeaderEpochs.put(topicPartition, leaderEpoch);
updated = true;
} else {
log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, leaderEpoch, topicPartition);
updated = false;
}
this.needFullUpdate = this.needFullUpdate || updated;
return updated;
}
@ -172,29 +192,6 @@ public class Metadata implements Closeable { @@ -172,29 +192,6 @@ public class Metadata implements Closeable {
return Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition));
}
/**
* Conditionally update the leader epoch for a partition
*
* @param topicPartition topic+partition to update the epoch for
* @param epoch the new epoch
* @param epochTest a predicate to determine if the old epoch should be replaced
* @return true if the epoch was updated, false otherwise
*/
private synchronized boolean updateLastSeenEpoch(TopicPartition topicPartition,
int epoch,
Predicate<Integer> epochTest) {
Integer oldEpoch = lastSeenLeaderEpochs.get(topicPartition);
log.trace("Determining if we should replace existing epoch {} with new epoch {}", oldEpoch, epoch);
if (oldEpoch == null || epochTest.test(oldEpoch)) {
log.debug("Updating last seen epoch from {} to {} for partition {}", oldEpoch, epoch, topicPartition);
lastSeenLeaderEpochs.put(topicPartition, epoch);
return true;
} else {
log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, epoch, topicPartition);
return false;
}
}
/**
* Check whether an update has been explicitly requested.
*
@ -373,10 +370,14 @@ public class Metadata implements Closeable { @@ -373,10 +370,14 @@ public class Metadata implements Closeable {
if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
int newEpoch = partitionMetadata.leaderEpoch.get();
// If the received leader epoch is at least the same as the previous one, update the metadata
if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch)) {
Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
if (currentEpoch == null || newEpoch >= currentEpoch) {
log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
lastSeenLeaderEpochs.put(tp, newEpoch);
return Optional.of(partitionMetadata);
} else {
// Otherwise ignore the new metadata and use the previously cached info
log.debug("Got metadata for an older epoch {} (current is {}) for partition {}, not updating", newEpoch, currentEpoch, tp);
return cache.partitionMetadata(tp);
}
} else {

9
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -486,7 +486,7 @@ public class Fetcher<K, V> implements Closeable { @@ -486,7 +486,7 @@ public class Fetcher<K, V> implements Closeable {
// Validate each partition against the current leader and epoch
subscriptions.assignedPartitions().forEach(topicPartition -> {
ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch);
subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch);
});
// Collect positions needing validation, with backoff
@ -756,7 +756,7 @@ public class Fetcher<K, V> implements Closeable { @@ -756,7 +756,7 @@ public class Fetcher<K, V> implements Closeable {
}
}
private boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
if (apiVersion == null)
return false;
@ -1101,8 +1101,9 @@ public class Fetcher<K, V> implements Closeable { @@ -1101,8 +1101,9 @@ public class Fetcher<K, V> implements Closeable {
Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();
// Ensure the position has an up-to-date leader
subscriptions.assignedPartitions().forEach(
tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.currentLeader(tp)));
subscriptions.assignedPartitions().forEach(tp ->
subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp, metadata.currentLeader(tp))
);
long currentTimeMs = time.milliseconds();

29
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

@ -16,7 +16,9 @@ @@ -16,7 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -45,6 +47,8 @@ import java.util.regex.Pattern; @@ -45,6 +47,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.internals.Fetcher.hasUsableOffsetForLeaderEpochVersion;
/**
* A class for tracking the topics, partitions, and offsets for the consumer. A partition
* is "assigned" either directly with {@link #assignFromUser(Set)} (manual assignment)
@ -422,8 +426,29 @@ public class SubscriptionState { @@ -422,8 +426,29 @@ public class SubscriptionState {
assignedState(tp).position(position);
}
public synchronized boolean maybeValidatePositionForCurrentLeader(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) {
return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
/**
* Enter the offset validation state if the leader for this partition is known to support a usable version of the
* OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete the offset validation.
*
* @param apiVersions
* @param tp
* @param leaderAndEpoch
* @return true if we enter the offset validation state
*/
public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition tp,
Metadata.LeaderAndEpoch leaderAndEpoch) {
if (leaderAndEpoch.leader.isPresent()) {
NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString());
if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
} else {
// If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation
completeValidation(tp);
return false;
}
} else {
return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
}
}
/**

73
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java

@ -313,6 +313,10 @@ public class MetadataTest { @@ -313,6 +313,10 @@ public class MetadataTest {
boolean[] updateResult = {true, false, false, false, false, true, false, false, false, true};
TopicPartition tp = new TopicPartition("topic", 0);
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic", 1), _tp -> 0);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
for (int i = 0; i < epochs.length; i++) {
metadata.updateLastSeenEpochIfNewer(tp, epochs[i]);
if (updateResult[i]) {
@ -325,6 +329,46 @@ public class MetadataTest { @@ -325,6 +329,46 @@ public class MetadataTest {
}
}
@Test
public void testUpdateLastEpoch() {
TopicPartition tp = new TopicPartition("topic-1", 0);
MetadataResponse metadataResponse = emptyMetadataResponse();
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
// if we have no leader epoch, this call shouldn't do anything
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0));
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 2));
assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
// Metadata with newer epoch is handled
metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
// Don't update to an older one
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
// Don't cause update if it's the same one
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 10));
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
// Update if we see newer epoch
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 12));
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
// Don't overwrite metadata with older epoch
metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 11);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
}
@Test
public void testRejectOldMetadata() {
Map<String, Integer> partitionCounts = new HashMap<>();
@ -378,26 +422,6 @@ public class MetadataTest { @@ -378,26 +422,6 @@ public class MetadataTest {
}
}
@Test
public void testMaybeRequestUpdate() {
TopicPartition tp = new TopicPartition("topic-1", 0);
metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L);
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 1));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 1L);
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 1));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 2L);
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 0));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 1);
metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 3L);
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 2));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 2);
}
@Test
public void testOutOfBandEpochUpdate() {
Map<String, Integer> partitionCounts = new HashMap<>();
@ -406,7 +430,7 @@ public class MetadataTest { @@ -406,7 +430,7 @@ public class MetadataTest {
metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L);
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 99));
assertFalse(metadata.updateLastSeenEpochIfNewer(tp, 99));
// Update epoch to 100
MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100);
@ -414,7 +438,7 @@ public class MetadataTest { @@ -414,7 +438,7 @@ public class MetadataTest {
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
// Simulate a leader epoch from another response, like a fetch response (not yet implemented)
// Simulate a leader epoch from another response, like a fetch response or list offsets
assertTrue(metadata.updateLastSeenEpochIfNewer(tp, 101));
// Cache of partition stays, but current partition info is not available since it's stale
@ -454,6 +478,11 @@ public class MetadataTest { @@ -454,6 +478,11 @@ public class MetadataTest {
assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertEquals(0, metadata.partitionMetadataIfCurrent(tp).get().partition());
assertEquals(Optional.of(0), metadata.partitionMetadataIfCurrent(tp).get().leaderId);
// Since epoch was null, this shouldn't update it
metadata.updateLastSeenEpochIfNewer(tp, 10);
assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertFalse(metadata.partitionMetadataIfCurrent(tp).get().leaderEpoch.isPresent());
}
@Test

104
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -1593,6 +1593,53 @@ public class FetcherTest { @@ -1593,6 +1593,53 @@ public class FetcherTest {
assertEquals(5, subscriptions.position(tp0).offset);
}
@Test
public void testListOffsetNoUpdateMissingEpoch() {
buildFetcher();
// Set up metadata with no leader epoch
subscriptions.assignFromUser(singleton(tp0));
MetadataResponse metadataWithNoLeaderEpochs = TestUtils.metadataUpdateWith(
"kafka-cluster", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp -> null);
client.updateMetadata(metadataWithNoLeaderEpochs);
// Return a ListOffsets response with leaderEpoch=1, we should ignore it
subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 1));
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
// Reset should be satisfied and no metadata update requested
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
assertFalse(metadata.updateRequested());
assertFalse(metadata.lastSeenLeaderEpoch(tp0).isPresent());
}
@Test
public void testListOffsetUpdateEpoch() {
buildFetcher();
// Set up metadata with leaderEpoch=1
subscriptions.assignFromUser(singleton(tp0));
MetadataResponse metadataWithLeaderEpochs = TestUtils.metadataUpdateWith(
"kafka-cluster", 1, Collections.emptyMap(), singletonMap(topicName, 4), tp -> 1);
client.updateMetadata(metadataWithLeaderEpochs);
// Reset offsets to trigger ListOffsets call
subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
// Now we see a ListOffsets with leaderEpoch=2 epoch, we trigger a metadata update
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP, 1),
listOffsetResponse(tp0, Errors.NONE, 1L, 5L, 2));
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
assertTrue(metadata.updateRequested());
assertOptional(metadata.lastSeenLeaderEpoch(tp0), epoch -> assertEquals((long) epoch, 2));
}
@Test
public void testUpdateFetchPositionDisconnect() {
buildFetcher();
@ -3673,18 +3720,35 @@ public class FetcherTest { @@ -3673,18 +3720,35 @@ public class FetcherTest {
apiVersions.update(node.idString(), NodeApiVersions.create(
ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
// Seek with a position and leader+epoch
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
metadata.currentLeader(tp0).leader, Optional.of(epochOne));
subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
{
// Seek with a position and leader+epoch
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
metadata.currentLeader(tp0).leader, Optional.of(epochOne));
subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
// Update metadata to epoch=2, enter validation
metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
fetcher.validateOffsetsIfNeeded();
// Update metadata to epoch=2, enter validation
metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
fetcher.validateOffsetsIfNeeded();
// Offset validation is skipped
assertFalse(subscriptions.awaitingValidation(tp0));
// Offset validation is skipped
assertFalse(subscriptions.awaitingValidation(tp0));
}
{
// Seek with a position and leader+epoch
Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
metadata.currentLeader(tp0).leader, Optional.of(epochOne));
subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
// Update metadata to epoch=2, enter validation
metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), partitionCounts, tp -> epochTwo), false, 0L);
// Subscription should not stay in AWAITING_VALIDATION in prepareFetchRequest
assertEquals(1, fetcher.sendFetches());
assertFalse(subscriptions.awaitingValidation(tp0));
}
}
@Test
@ -3764,7 +3828,7 @@ public class FetcherTest { @@ -3764,7 +3828,7 @@ public class FetcherTest {
Optional.of(epochTwo),
new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochTwo)));
subscriptions.position(tp0, nextPosition);
subscriptions.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree)));
subscriptions.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree)));
// Prepare offset list response from async validation with epoch=2
Map<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<>();
@ -4065,13 +4129,27 @@ public class FetcherTest { @@ -4065,13 +4129,27 @@ public class FetcherTest {
};
}
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp, final int leaderEpoch) {
// matches any list offset request with the provided timestamp
return body -> {
ListOffsetRequest req = (ListOffsetRequest) body;
return req.partitionTimestamps().equals(Collections.singletonMap(
tp0, new ListOffsetRequest.PartitionData(timestamp, Optional.of(leaderEpoch))));
};
}
private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
return listOffsetResponse(tp0, error, timestamp, offset);
}
private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp, offset,
Optional.empty());
return listOffsetResponse(tp, error, timestamp, offset, null);
}
private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset,
Integer leaderEpoch) {
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(
error, timestamp, offset, Optional.ofNullable(leaderEpoch));
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
allPartitionData.put(tp, partitionData);
return new ListOffsetResponse(allPartitionData);

46
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java

@ -16,12 +16,15 @@ @@ -16,12 +16,15 @@
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
@ -382,13 +385,15 @@ public class SubscriptionStateTest { @@ -382,13 +385,15 @@ public class SubscriptionStateTest {
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(5))));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
ApiVersions apiVersions = new ApiVersions();
apiVersions.update(broker1.idString(), NodeApiVersions.create());
assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.empty())));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(10))));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
@ -414,6 +419,9 @@ public class SubscriptionStateTest { @@ -414,6 +419,9 @@ public class SubscriptionStateTest {
@Test
public void testSeekUnvalidatedWithOffsetEpoch() {
Node broker1 = new Node(1, "localhost", 9092);
ApiVersions apiVersions = new ApiVersions();
apiVersions.update(broker1.idString(), NodeApiVersions.create());
state.assignFromUser(Collections.singleton(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2),
@ -422,19 +430,19 @@ public class SubscriptionStateTest { @@ -422,19 +430,19 @@ public class SubscriptionStateTest {
assertTrue(state.awaitingValidation(tp0));
// Update using the current leader and epoch
assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(5))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
// Update with a newer leader and epoch
assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(15))));
assertFalse(state.hasValidPosition(tp0));
assertTrue(state.awaitingValidation(tp0));
// If the updated leader has no epoch information, then skip validation and begin fetching
assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(
assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.empty())));
assertTrue(state.hasValidPosition(tp0));
assertFalse(state.awaitingValidation(tp0));
@ -510,6 +518,34 @@ public class SubscriptionStateTest { @@ -510,6 +518,34 @@ public class SubscriptionStateTest {
assertEquals(initialPosition, state.position(tp0));
}
@Test
public void testMaybeValidatePositionForCurrentLeader() {
NodeApiVersions oldApis = NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2);
ApiVersions apiVersions = new ApiVersions();
apiVersions.update("1", oldApis);
Node broker1 = new Node(1, "localhost", 9092);
state.assignFromUser(Collections.singleton(tp0));
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
// if API is too old to be usable, we just skip validation
assertFalse(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(10))));
assertTrue(state.hasValidPosition(tp0));
// New API
apiVersions.update("1", NodeApiVersions.create());
state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5),
new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(10))));
// API is too old to be usable, we just skip validation
assertTrue(state.maybeValidatePositionForCurrentLeader(apiVersions, tp0, new Metadata.LeaderAndEpoch(
Optional.of(broker1), Optional.of(10))));
assertFalse(state.hasValidPosition(tp0));
}
@Test
public void testMaybeCompleteValidationAfterPositionChange() {
Node broker1 = new Node(1, "localhost", 9092);

Loading…
Cancel
Save