Browse Source

MINOR - KAFKA-15550: Validation for negative target times in offsetsForTimes (#14503)

The current KafkaConsumer offsetsForTimes fails with IllegalArgumentException if negative target timestamps are provided as arguments. This change includes the same validation and tests for the new consumer implementation (and some improved comments for the updateFetchPositions)

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
pull/11454/merge
Lianet Magrans 1 year ago committed by GitHub
parent
commit
58dfa1cc81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      checkstyle/suppressions.xml
  2. 28
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
  3. 20
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java

2
checkstyle/suppressions.xml

@ -48,7 +48,7 @@ @@ -48,7 +48,7 @@
<suppress id="dontUseSystemExit"
files="Exit.java"/>
<suppress checks="ClassFanOutComplexity"
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|PrototypeAsyncConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="NPath"

28
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java

@ -235,15 +235,18 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -235,15 +235,18 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
* defined
*/
private boolean updateFetchPositionsIfNeeded(final Timer timer) {
// If any partitions have been truncated due to a leader change, we need to validate the offsets
// Validate positions using the partition leader end offsets, to detect if any partition
// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch
// request, retrieve the partition end offsets, and validate the current position against it.
ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent();
eventHandler.add(validatePositionsEvent);
// If there are any partitions which do not have a valid position and are not
// awaiting reset, then we need to fetch committed offsets. We will only do a
// coordinator lookup if there are partitions which have missing positions, so
// a consumer with manually assigned partitions can avoid a coordinator dependence
// by always ensuring that assigned partitions have an initial position.
// Reset positions using committed offsets retrieved from the group coordinator, for any
// partitions which do not have a valid position and are not awaiting reset. This will
// trigger an OffsetFetch request and update positions with the offsets retrieved. This
// will only do a coordinator lookup if there are partitions which have missing
// positions, so a consumer with manually assigned partitions can avoid a coordinator
// dependence by always ensuring that assigned partitions have an initial position.
if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer))
return false;
@ -252,8 +255,10 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -252,8 +255,10 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
// are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception.
subscriptions.resetInitializingPositions();
// Finally send an asynchronous request to look up and update the positions of any
// partitions which are awaiting reset.
// Reset positions using partition offsets retrieved from the leader, for any partitions
// which are awaiting reset. This will trigger a ListOffset request, retrieve the
// partition offsets according to the strategy (ex. earliest, latest), and update the
// positions.
ResetPositionsApplicationEvent resetPositionsEvent = new ResetPositionsApplicationEvent();
eventHandler.add(resetPositionsEvent);
return true;
@ -443,6 +448,13 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -443,6 +448,13 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
// Keeping same argument validation error thrown by the current consumer implementation
// to avoid API level changes.
requireNonNull(timestampsToSearch, "Timestamps to search cannot be null");
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
// Exclude the earliest and latest offset here so the timestamp in the returned
// OffsetAndTimestamp is always positive.
if (entry.getValue() < 0)
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
}
if (timestampsToSearch.isEmpty()) {
return Collections.emptyMap();

20
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java

@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException; @@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
@ -322,6 +323,25 @@ public class PrototypeAsyncConsumerTest { @@ -322,6 +323,25 @@ public class PrototypeAsyncConsumerTest {
Duration.ofMillis(1)));
}
@Test
public void testOffsetsForTimesFailsOnNegativeTargetTimes() {
PrototypeAsyncConsumer<?, ?> consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
assertThrows(IllegalArgumentException.class,
() -> consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(
"topic1", 1), ListOffsetsRequest.EARLIEST_TIMESTAMP),
Duration.ofMillis(1)));
assertThrows(IllegalArgumentException.class,
() -> consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(
"topic1", 1), ListOffsetsRequest.LATEST_TIMESTAMP),
Duration.ofMillis(1)));
assertThrows(IllegalArgumentException.class,
() -> consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(
"topic1", 1), ListOffsetsRequest.MAX_TIMESTAMP),
Duration.ofMillis(1)));
}
@Test
public void testOffsetsForTimes() {
PrototypeAsyncConsumer<?, ?> consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());

Loading…
Cancel
Save