Browse Source

KAFKA-14274 [2-5/7]: Introduction of more infrastructure for forthcoming fetch request manager (#14359)

This continues the work of providing the groundwork for the fetch
refactoring work by introducing some new classes and refactoring the
existing code to use the new classes where applicable.

Changes:

* Minor clean up of the events classes to make data immutable,
  private, and implement toString().
* Added IdempotentCloser which prevents a resource from being closed
  more than once. It's general enough that it could be used elsewhere
  in the project, but it's limited to the consumer internals for now.
* Split core Fetcher code into classes to buffer raw results
  (FetchBuffer) and to collect raw results into ConsumerRecords
  (FetchCollector). These can be tested and changed in isolation from
  the core fetcher logic.
* Added NodeStatusDetector which abstracts methods from
  ConsumerNetworkClient so that it and NetworkClientDelegate can be
  used in AbstractFetch via the interface instead of using
  ConsumerNetworkClient directly.

Reviewers: Jun Rao <junrao@gmail.com>
pull/13993/merge
Kirk True 1 year ago committed by GitHub
parent
commit
e1dc6d9f34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      checkstyle/suppressions.xml
  2. 8
      clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
  3. 12
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  4. 517
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
  5. 40
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
  6. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  7. 25
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
  8. 1
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
  9. 1
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
  10. 149
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
  11. 372
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java
  12. 53
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
  13. 35
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  14. 9
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
  15. 5
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
  16. 11
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
  17. 44
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
  18. 11
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
  19. 48
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
  20. 46
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
  21. 54
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
  22. 31
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
  23. 41
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
  24. 3
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
  25. 7
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
  26. 38
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java
  27. 42
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopBackgroundEvent.java
  28. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java
  29. 39
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
  30. 174
      clients/src/main/java/org/apache/kafka/common/internals/IdempotentCloser.java
  31. 216
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
  32. 194
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
  33. 579
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
  34. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
  35. 79
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
  36. 183
      clients/src/test/java/org/apache/kafka/common/internals/IdempotentCloserTest.java

2
checkstyle/suppressions.xml

@ -87,7 +87,7 @@ @@ -87,7 +87,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
files="(AbstractFetch|ConsumerCoordinator|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
files="(AbstractFetch|ConsumerCoordinator|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>

8
clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java

@ -132,4 +132,12 @@ public final class NetworkClientUtils { @@ -132,4 +132,12 @@ public final class NetworkClientUtils {
if (exception != null)
throw exception;
}
/**
* Initiate a connection if currently possible. This is only really useful for resetting the
* failed status of a socket.
*/
public static void tryConnect(KafkaClient client, Node node, Time time) {
client.ready(node, time.milliseconds());
}
}

12
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -918,7 +918,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -918,7 +918,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
throwIfNoAssignorsConfigured();
fetcher.clearBufferedDataForUnassignedTopics(topics);
// Clear the buffered data which are not a part of newly assigned topics
final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
for (TopicPartition tp : subscriptions.assignedPartitions()) {
if (topics.contains(tp.topic()))
currentTopicPartitions.add(tp);
}
fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
log.info("Subscribed to topic(s): {}", join(topics, ", "));
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
metadata.requestUpdateForNewTopics();

517
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java

@ -18,20 +18,14 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,20 +18,14 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.BufferSupplier;
@ -43,20 +37,17 @@ import org.slf4j.Logger; @@ -43,20 +37,17 @@ import org.slf4j.Logger;
import org.slf4j.helpers.MessageFormatter;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;
import static org.apache.kafka.clients.consumer.internals.FetchUtils.requestMetadataUpdate;
/**
* {@code AbstractFetch} represents the basic state and logic for record fetching processing.
@ -73,13 +64,12 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -73,13 +64,12 @@ public abstract class AbstractFetch<K, V> implements Closeable {
protected final FetchConfig<K, V> fetchConfig;
protected final Time time;
protected final FetchMetricsManager metricsManager;
protected final FetchBuffer fetchBuffer;
protected final BufferSupplier decompressionBufferSupplier;
protected final Set<Integer> nodesWithPendingFetchRequests;
protected final IdempotentCloser idempotentCloser = new IdempotentCloser();
private final BufferSupplier decompressionBufferSupplier;
private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
private final Map<Integer, FetchSessionHandler> sessionHandlers;
private final Set<Integer> nodesWithPendingFetchRequests;
private CompletedFetch<K, V> nextInLineFetch;
public AbstractFetch(final LogContext logContext,
final ConsumerNetworkClient client,
@ -95,7 +85,7 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -95,7 +85,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
this.subscriptions = subscriptions;
this.fetchConfig = fetchConfig;
this.decompressionBufferSupplier = BufferSupplier.create();
this.completedFetches = new ConcurrentLinkedQueue<>();
this.fetchBuffer = new FetchBuffer(logContext);
this.sessionHandlers = new HashMap<>();
this.nodesWithPendingFetchRequests = new HashSet<>();
this.metricsManager = metricsManager;
@ -109,7 +99,7 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -109,7 +99,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
* @return true if there are completed fetches, false otherwise
*/
boolean hasCompletedFetches() {
return !completedFetches.isEmpty();
return !fetchBuffer.isEmpty();
}
/**
@ -117,7 +107,7 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -117,7 +107,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
* @return true if there are completed fetches that can be returned, false otherwise
*/
public boolean hasAvailableFetches() {
return completedFetches.stream().anyMatch(fetch -> subscriptions.isFetchable(fetch.partition));
return fetchBuffer.hasCompletedFetches(fetch -> subscriptions.isFetchable(fetch.partition));
}
/**
@ -181,17 +171,16 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -181,17 +171,16 @@ public abstract class AbstractFetch<K, V> implements Closeable {
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
fetchConfig.isolationLevel, fetchOffset, partition, partitionData);
CompletedFetch<K, V> completedFetch = new CompletedFetch<>(
CompletedFetch completedFetch = new CompletedFetch(
logContext,
subscriptions,
fetchConfig,
decompressionBufferSupplier,
partition,
partitionData,
metricAggregator,
fetchOffset,
requestVersion);
completedFetches.add(completedFetch);
fetchBuffer.add(completedFetch);
}
metricsManager.recordLatency(resp.requestLatencyMs());
@ -205,14 +194,14 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -205,14 +194,14 @@ public abstract class AbstractFetch<K, V> implements Closeable {
* Implements the core logic for a failed fetch request/response.
*
* @param fetchTarget {@link Node} from which the fetch data was requested
* @param e {@link RuntimeException} representing the error that resulted in the failure
* @param t {@link Throwable} representing the error that resulted in the failure
*/
protected void handleFetchResponse(final Node fetchTarget, final RuntimeException e) {
protected void handleFetchResponse(final Node fetchTarget, final Throwable t) {
try {
final FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
handler.handleError(t);
handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica);
}
} finally {
@ -221,6 +210,20 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -221,6 +210,20 @@ public abstract class AbstractFetch<K, V> implements Closeable {
}
}
protected void handleCloseFetchSessionResponse(final Node fetchTarget,
final FetchSessionHandler.FetchRequestData data) {
int sessionId = data.metadata().sessionId();
log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget);
}
public void handleCloseFetchSessionResponse(final Node fetchTarget,
final FetchSessionHandler.FetchRequestData data,
final Throwable t) {
int sessionId = data.metadata().sessionId();
log.debug("Unable to a close message for fetch session: {} to node: {}. " +
"This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, t);
}
/**
* Creates a new {@link FetchRequest fetch request} in preparation for sending to the Kafka cluster.
*
@ -256,138 +259,23 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -256,138 +259,23 @@ public abstract class AbstractFetch<K, V> implements Closeable {
}
/**
* Return the fetched records, empty the record buffer and update the consumed position.
*
* </p>
*
* NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated.
* Return the list of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
* but <em>excluding</em> any partitions for which we still have buffered data. The idea is that since the user
* has yet to process the data for the partition that has already been fetched, we should not go send for more data
* until the previously-fetched data has been processed.
*
* @return A {@link Fetch} for the requested partitions
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
* @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse.
* @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data
*/
public Fetch<K, V> collectFetch() {
Fetch<K, V> fetch = Fetch.empty();
Queue<CompletedFetch<K, V>> pausedCompletedFetches = new ArrayDeque<>();
int recordsRemaining = fetchConfig.maxPollRecords;
private Set<TopicPartition> fetchablePartitions() {
// This is the set of partitions we have in our buffer
Set<TopicPartition> buffered = fetchBuffer.bufferedPartitions();
try {
while (recordsRemaining > 0) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
CompletedFetch<K, V> records = completedFetches.peek();
if (records == null) break;
if (!records.isInitialized()) {
try {
nextInLineFetch = initializeCompletedFetch(records);
} catch (Exception e) {
// Remove a completedFetch upon a parse with exception if (1) it contains no records, and
// (2) there are no fetched records with actual content preceding this exception.
// The first condition ensures that the completedFetches is not stuck with the same completedFetch
// in cases such as the TopicAuthorizationException, and the second condition ensures that no
// potential data loss due to an exception in a following record.
if (fetch.isEmpty() && FetchResponse.recordsOrFail(records.partitionData).sizeInBytes() == 0) {
completedFetches.poll();
}
throw e;
}
} else {
nextInLineFetch = records;
}
completedFetches.poll();
} else if (subscriptions.isPaused(nextInLineFetch.partition)) {
// when the partition is paused we add the records back to the completedFetches queue instead of draining
// them so that they can be returned on a subsequent poll if the partition is resumed at that time
log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
pausedCompletedFetches.add(nextInLineFetch);
nextInLineFetch = null;
} else {
Fetch<K, V> nextFetch = fetchRecords(recordsRemaining);
recordsRemaining -= nextFetch.numRecords();
fetch.add(nextFetch);
}
}
} catch (KafkaException e) {
if (fetch.isEmpty())
throw e;
} finally {
// add any polled completed fetches for paused partitions back to the completed fetches queue to be
// re-evaluated in the next poll
completedFetches.addAll(pausedCompletedFetches);
}
// This is the test that returns true if the partition is *not* buffered
Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);
return fetch;
}
private Fetch<K, V> fetchRecords(final int maxRecords) {
if (!subscriptions.isAssigned(nextInLineFetch.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned",
nextInLineFetch.partition);
} else if (!subscriptions.isFetchable(nextInLineFetch.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's
// poll call or if the offset is being reset
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
nextInLineFetch.partition);
} else {
SubscriptionState.FetchPosition position = subscriptions.position(nextInLineFetch.partition);
if (position == null) {
throw new IllegalStateException("Missing position for fetchable partition " + nextInLineFetch.partition);
}
if (nextInLineFetch.nextFetchOffset() == position.offset) {
List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(maxRecords);
log.trace("Returning {} fetched records at offset {} for assigned partition {}",
partRecords.size(), position, nextInLineFetch.partition);
boolean positionAdvanced = false;
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
position.currentLeader);
log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
position, nextPosition, nextInLineFetch.partition, partRecords.size());
subscriptions.position(nextInLineFetch.partition, nextPosition);
positionAdvanced = true;
}
Long partitionLag = subscriptions.partitionLag(nextInLineFetch.partition, fetchConfig.isolationLevel);
if (partitionLag != null)
metricsManager.recordPartitionLag(nextInLineFetch.partition, partitionLag);
Long lead = subscriptions.partitionLead(nextInLineFetch.partition);
if (lead != null) {
metricsManager.recordPartitionLead(nextInLineFetch.partition, lead);
}
return Fetch.forPartition(nextInLineFetch.partition, partRecords, positionAdvanced);
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
nextInLineFetch.partition, nextInLineFetch.nextFetchOffset(), position);
}
}
log.trace("Draining fetched records for partition {}", nextInLineFetch.partition);
nextInLineFetch.drain();
return Fetch.empty();
}
private List<TopicPartition> fetchablePartitions() {
Set<TopicPartition> exclude = new HashSet<>();
if (nextInLineFetch != null && !nextInLineFetch.isConsumed()) {
exclude.add(nextInLineFetch.partition);
}
for (CompletedFetch<K, V> completedFetch : completedFetches) {
exclude.add(completedFetch.partition);
}
return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
// Return all partitions that are in an otherwise fetchable state *and* for which we don't already have some
// messages sitting in our buffer.
return new HashSet<>(subscriptions.fetchablePartitions(isNotBuffered));
}
/**
@ -421,7 +309,7 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -421,7 +309,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
" using the leader instead.", nodeId, partition);
// Note that this condition may happen due to stale metadata, so we clear preferred replica and
// refresh metadata.
requestMetadataUpdate(partition);
requestMetadataUpdate(metadata, subscriptions, partition);
return leaderReplica;
}
} else {
@ -429,6 +317,37 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -429,6 +317,37 @@ public abstract class AbstractFetch<K, V> implements Closeable {
}
}
private Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessionRequests() {
final Cluster cluster = metadata.fetch();
Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();
try {
sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
// set the session handler to notify close. This will set the next metadata request to send close message.
sessionHandler.notifyClose();
// FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
// skip sending the close request.
final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
return;
}
fetchable.put(fetchTarget, sessionHandler.newBuilder());
});
} finally {
sessionHandlers.clear();
}
Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>();
for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) {
reqs.put(entry.getKey(), entry.getValue().build());
}
return reqs;
}
/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
@ -493,262 +412,30 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -493,262 +412,30 @@ public abstract class AbstractFetch<K, V> implements Closeable {
return reqs;
}
/**
* Initialize a CompletedFetch object.
*/
private CompletedFetch<K, V> initializeCompletedFetch(final CompletedFetch<K, V> completedFetch) {
final TopicPartition tp = completedFetch.partition;
final Errors error = Errors.forCode(completedFetch.partitionData.errorCode());
boolean recordMetrics = true;
try {
if (!subscriptions.hasValidPosition(tp)) {
// this can happen when a rebalance happened while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
return null;
} else if (error == Errors.NONE) {
final CompletedFetch<K, V> ret = handleInitializeCompletedFetchSuccess(completedFetch);
recordMetrics = ret == null;
return ret;
} else {
handleInitializeCompletedFetchErrors(completedFetch, error);
return null;
}
} finally {
if (recordMetrics) {
completedFetch.recordAggregatedMetrics(0, 0);
}
if (error != Errors.NONE)
// we move the partition to the end if there was an error. This way, it's more likely that partitions for
// the same topic can remain together (allowing for more efficient serialization).
subscriptions.movePartitionToEnd(tp);
}
}
private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(final CompletedFetch<K, V> completedFetch) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
SubscriptionState.FetchPosition position = subscriptions.position(tp);
if (position == null || position.offset != fetchOffset) {
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
final FetchResponseData.PartitionData partition = completedFetch.partitionData;
log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
FetchResponse.recordsSize(partition), tp, position);
Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator();
if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
if (completedFetch.requestVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
recordTooLargePartitions + " whose size is larger than the fetch size " + fetchConfig.fetchSize +
" and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
"newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
recordTooLargePartitions);
} else {
// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
fetchOffset + ". Received a non-empty fetch response from the server, but no " +
"complete records were found.");
}
}
if (partition.highWatermark() >= 0) {
log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark());
subscriptions.updateHighWatermark(tp, partition.highWatermark());
}
if (partition.logStartOffset() >= 0) {
log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset());
subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
}
if (partition.lastStableOffset() >= 0) {
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset());
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
}
if (FetchResponse.isPreferredReplica(partition)) {
subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica(), () -> {
long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs();
log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}",
tp, partition.preferredReadReplica(), expireTimeMs);
return expireTimeMs;
});
}
completedFetch.setInitialized();
return completedFetch;
}
private void handleInitializeCompletedFetchErrors(final CompletedFetch<K, V> completedFetch,
final Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();
if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.FENCED_LEADER_EPOCH ||
error == Errors.OFFSET_NOT_AVAILABLE) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
requestMetadataUpdate(tp);
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
requestMetadataUpdate(tp);
} else if (error == Errors.UNKNOWN_TOPIC_ID) {
log.warn("Received unknown topic ID error in fetch for partition {}", tp);
requestMetadataUpdate(tp);
} else if (error == Errors.INCONSISTENT_TOPIC_ID) {
log.warn("Received inconsistent topic ID error in fetch for partition {}", tp);
requestMetadataUpdate(tp);
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
SubscriptionState.FetchPosition position = subscriptions.position(tp);
if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
"does not match the current offset {}", tp, fetchOffset, position);
} else {
handleOffsetOutOfRange(position, tp);
}
} else {
log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
//we log the actual partition and not just the topic to help with ACL propagation issues in large clusters
log.warn("Not authorized to read from partition {}.", tp);
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
log.debug("Received unknown leader epoch error in fetch for partition {}", tp);
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown server error while fetching offset {} for topic-partition {}",
fetchOffset, tp);
} else if (error == Errors.CORRUPT_MESSAGE) {
throw new KafkaException("Encountered corrupt message when fetching offset "
+ fetchOffset
+ " for topic-partition "
+ tp);
} else {
throw new IllegalStateException("Unexpected error code "
+ error.code()
+ " while fetching at offset "
+ fetchOffset
+ " from topic-partition " + tp);
}
}
private void handleOffsetOutOfRange(final SubscriptionState.FetchPosition fetchPosition,
final TopicPartition topicPartition) {
String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition;
if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(topicPartition);
} else {
log.info("{}, raising error to the application since no reset policy is configured", errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Collections.singletonMap(topicPartition, fetchPosition.offset));
}
}
/**
* Clear the buffered data which are not a part of newly assigned partitions. Any previously
* {@link CompletedFetch fetched data} is dropped if it is for a partition that is no longer in
* {@code assignedPartitions}.
*
* @param assignedPartitions Newly-assigned {@link TopicPartition}
*/
public void clearBufferedDataForUnassignedPartitions(final Collection<TopicPartition> assignedPartitions) {
final Iterator<CompletedFetch<K, V>> completedFetchesItr = completedFetches.iterator();
while (completedFetchesItr.hasNext()) {
final CompletedFetch<K, V> completedFetch = completedFetchesItr.next();
final TopicPartition tp = completedFetch.partition;
if (!assignedPartitions.contains(tp)) {
log.debug("Removing {} from buffered data as it is no longer an assigned partition", tp);
completedFetch.drain();
completedFetchesItr.remove();
}
}
if (nextInLineFetch != null && !assignedPartitions.contains(nextInLineFetch.partition)) {
nextInLineFetch.drain();
nextInLineFetch = null;
}
}
/**
* Clear the buffered data which are not a part of newly assigned topics
*
* @param assignedTopics newly assigned topics
*/
public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) {
final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
for (TopicPartition tp : subscriptions.assignedPartitions()) {
if (assignedTopics.contains(tp.topic())) {
currentTopicPartitions.add(tp);
}
}
clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
}
protected FetchSessionHandler sessionHandler(int node) {
return sessionHandlers.get(node);
}
// Visible for testing
void maybeCloseFetchSessions(final Timer timer) {
final Cluster cluster = metadata.fetch();
protected void maybeCloseFetchSessions(final Timer timer) {
final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>();
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareCloseFetchSessionRequests();
sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
// set the session handler to notify close. This will set the next metadata request to send close message.
sessionHandler.notifyClose();
final int sessionId = sessionHandler.sessionId();
// FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
// skip sending the close request.
final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
return;
}
final FetchRequest.Builder request = createFetchRequest(fetchTarget, sessionHandler.newBuilder().build());
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = createFetchRequest(fetchTarget, data);
final RequestFuture<ClientResponse> responseFuture = client.send(fetchTarget, request);
responseFuture.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse value) {
log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget);
handleCloseFetchSessionResponse(fetchTarget, data);
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Unable to a close message for fetch session: {} to node: {}. " +
"This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e);
handleCloseFetchSessionResponse(fetchTarget, data, e);
}
});
requestFutures.add(responseFuture);
});
}
// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until
// all requests have received a response.
@ -765,27 +452,35 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -765,27 +452,35 @@ public abstract class AbstractFetch<K, V> implements Closeable {
}
}
public void close(final Timer timer) {
// we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
if (nextInLineFetch != null) {
nextInLineFetch.drain();
nextInLineFetch = null;
}
// Visible for testing
protected FetchSessionHandler sessionHandler(int node) {
return sessionHandlers.get(node);
}
/**
* This method is called by {@link #close(Timer)} which is guarded by the {@link IdempotentCloser}) such as to only
* be executed once the first time that any of the {@link #close()} methods are called. Subclasses can override
* this method without the need for extra synchronization at the instance level.
*
* @param timer Timer to enforce time limit
*/
// Visible for testing
protected void closeInternal(Timer timer) {
// we do not need to re-enable wake-ups since we are closing already
client.disableWakeups();
maybeCloseFetchSessions(timer);
Utils.closeQuietly(fetchBuffer, "fetchBuffer");
Utils.closeQuietly(decompressionBufferSupplier, "decompressionBufferSupplier");
sessionHandlers.clear();
}
public void close(final Timer timer) {
idempotentCloser.close(() -> {
closeInternal(timer);
});
}
@Override
public void close() {
close(time.timer(0));
}
private void requestMetadataUpdate(final TopicPartition topicPartition) {
metadata.requestUpdate(false);
subscriptions.clearPreferredReadReplica(topicPartition);
}
}

40
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java

@ -52,12 +52,10 @@ import java.util.Set; @@ -52,12 +52,10 @@ import java.util.Set;
/**
* {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
* broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
*
* @param <K> Record key type
* @param <V> Record value type
* broker via a {@link FetchRequest}. It contains logic to maintain state between calls to
* {@link #fetchRecords(FetchConfig, int)}.
*/
public class CompletedFetch<K, V> {
public class CompletedFetch {
final TopicPartition partition;
final FetchResponseData.PartitionData partitionData;
@ -65,7 +63,6 @@ public class CompletedFetch<K, V> { @@ -65,7 +63,6 @@ public class CompletedFetch<K, V> {
private final Logger log;
private final SubscriptionState subscriptions;
private final FetchConfig<K, V> fetchConfig;
private final BufferSupplier decompressionBufferSupplier;
private final Iterator<? extends RecordBatch> batches;
private final Set<Long> abortedProducerIds;
@ -86,7 +83,6 @@ public class CompletedFetch<K, V> { @@ -86,7 +83,6 @@ public class CompletedFetch<K, V> {
CompletedFetch(LogContext logContext,
SubscriptionState subscriptions,
FetchConfig<K, V> fetchConfig,
BufferSupplier decompressionBufferSupplier,
TopicPartition partition,
FetchResponseData.PartitionData partitionData,
@ -95,7 +91,6 @@ public class CompletedFetch<K, V> { @@ -95,7 +91,6 @@ public class CompletedFetch<K, V> {
short requestVersion) {
this.log = logContext.logger(CompletedFetch.class);
this.subscriptions = subscriptions;
this.fetchConfig = fetchConfig;
this.decompressionBufferSupplier = decompressionBufferSupplier;
this.partition = partition;
this.partitionData = partitionData;
@ -140,7 +135,7 @@ public class CompletedFetch<K, V> { @@ -140,7 +135,7 @@ public class CompletedFetch<K, V> {
/**
* Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
* are closed. This is somewhat analogous to {@link Closeable#close() closing}, though no error will result if a
* caller invokes {@link #fetchRecords(int)}; an empty {@link List list} will be returned instead.
* caller invokes {@link #fetchRecords(FetchConfig, int)}; an empty {@link List list} will be returned instead.
*/
void drain() {
if (!isConsumed) {
@ -156,7 +151,7 @@ public class CompletedFetch<K, V> { @@ -156,7 +151,7 @@ public class CompletedFetch<K, V> {
}
}
private void maybeEnsureValid(RecordBatch batch) {
private <K, V> void maybeEnsureValid(FetchConfig<K, V> fetchConfig, RecordBatch batch) {
if (fetchConfig.checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
try {
batch.ensureValid();
@ -167,7 +162,7 @@ public class CompletedFetch<K, V> { @@ -167,7 +162,7 @@ public class CompletedFetch<K, V> {
}
}
private void maybeEnsureValid(Record record) {
private <K, V> void maybeEnsureValid(FetchConfig<K, V> fetchConfig, Record record) {
if (fetchConfig.checkCrcs) {
try {
record.ensureValid();
@ -185,7 +180,7 @@ public class CompletedFetch<K, V> { @@ -185,7 +180,7 @@ public class CompletedFetch<K, V> {
}
}
private Record nextFetchedRecord() {
private <K, V> Record nextFetchedRecord(FetchConfig<K, V> fetchConfig) {
while (true) {
if (records == null || !records.hasNext()) {
maybeCloseRecordStream();
@ -204,7 +199,7 @@ public class CompletedFetch<K, V> { @@ -204,7 +199,7 @@ public class CompletedFetch<K, V> {
currentBatch = batches.next();
lastEpoch = maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
maybeEnsureValid(currentBatch);
maybeEnsureValid(fetchConfig, currentBatch);
if (fetchConfig.isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
// remove from the aborted transaction queue all aborted transactions which have begun
@ -230,7 +225,7 @@ public class CompletedFetch<K, V> { @@ -230,7 +225,7 @@ public class CompletedFetch<K, V> {
// skip any records out of range
if (record.offset() >= nextFetchOffset) {
// we only do validation when the message should not be skipped.
maybeEnsureValid(record);
maybeEnsureValid(fetchConfig, record);
// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
@ -250,10 +245,11 @@ public class CompletedFetch<K, V> { @@ -250,10 +245,11 @@ public class CompletedFetch<K, V> {
* {@link Deserializer deserialization} of the {@link Record record's} key and value are performed in
* this step.
*
* @param fetchConfig {@link FetchConfig Configuration} to use, including, but not limited to, {@link Deserializer}s
* @param maxRecords The number of records to return; the number returned may be {@code 0 <= maxRecords}
* @return {@link ConsumerRecord Consumer records}
*/
List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
<K, V> List<ConsumerRecord<K, V>> fetchRecords(FetchConfig<K, V> fetchConfig, int maxRecords) {
// Error when fetching the next record before deserialization.
if (corruptLastRecord)
throw new KafkaException("Received exception when fetching the next record from " + partition
@ -271,7 +267,7 @@ public class CompletedFetch<K, V> { @@ -271,7 +267,7 @@ public class CompletedFetch<K, V> {
// use the last record to do deserialization again.
if (cachedRecordException == null) {
corruptLastRecord = true;
lastRecord = nextFetchedRecord();
lastRecord = nextFetchedRecord(fetchConfig);
corruptLastRecord = false;
}
@ -280,7 +276,7 @@ public class CompletedFetch<K, V> { @@ -280,7 +276,7 @@ public class CompletedFetch<K, V> {
Optional<Integer> leaderEpoch = maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
TimestampType timestampType = currentBatch.timestampType();
ConsumerRecord<K, V> record = parseRecord(partition, leaderEpoch, timestampType, lastRecord);
ConsumerRecord<K, V> record = parseRecord(fetchConfig, partition, leaderEpoch, timestampType, lastRecord);
records.add(record);
recordsRead++;
bytesRead += lastRecord.sizeInBytes();
@ -306,10 +302,11 @@ public class CompletedFetch<K, V> { @@ -306,10 +302,11 @@ public class CompletedFetch<K, V> {
/**
* Parse the record entry, deserializing the key / value fields if necessary
*/
ConsumerRecord<K, V> parseRecord(TopicPartition partition,
Optional<Integer> leaderEpoch,
TimestampType timestampType,
Record record) {
<K, V> ConsumerRecord<K, V> parseRecord(FetchConfig<K, V> fetchConfig,
TopicPartition partition,
Optional<Integer> leaderEpoch,
TimestampType timestampType,
Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
@ -324,6 +321,7 @@ public class CompletedFetch<K, V> { @@ -324,6 +321,7 @@ public class CompletedFetch<K, V> {
valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
key, value, headers, leaderEpoch);
} catch (RuntimeException e) {
log.error("Deserializers with error: {}", fetchConfig.deserializers);
throw new RecordDeserializationException(partition, record.offset(),
"Error deserializing key/value for partition " + partition +
" at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java

@ -586,7 +586,7 @@ public class ConsumerNetworkClient implements Closeable { @@ -586,7 +586,7 @@ public class ConsumerNetworkClient implements Closeable {
public void tryConnect(Node node) {
lock.lock();
try {
client.ready(node, time.milliseconds());
NetworkClientUtils.tryConnect(client, node, time);
} finally {
lock.unlock();
}

25
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java

@ -26,6 +26,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -26,6 +26,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -34,6 +38,7 @@ import org.apache.kafka.common.metrics.MetricsReporter; @@ -34,6 +38,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import java.util.Collections;
import java.util.List;
@ -41,6 +46,8 @@ import java.util.Locale; @@ -41,6 +46,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public final class ConsumerUtils {
@ -141,4 +148,22 @@ public final class ConsumerUtils { @@ -141,4 +148,22 @@ public final class ConsumerUtils {
return (List<ConsumerInterceptor<K, V>>) ClientUtils.configuredInterceptors(config, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
}
public static <T> T getResult(CompletableFuture<T> future, Timer timer) {
try {
return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof WakeupException)
throw new WakeupException();
else if (t instanceof KafkaException)
throw (KafkaException) t;
else
throw new KafkaException(t);
} catch (InterruptedException e) {
throw new InterruptException(e);
} catch (java.util.concurrent.TimeoutException e) {
throw new TimeoutException(e);
}
}
}

1
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java

@ -165,6 +165,7 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -165,6 +165,7 @@ public class DefaultBackgroundThread extends KafkaThread {
retryBackoffMs,
requestTimeoutMs,
apiVersions,
networkClientDelegate,
logContext);
CoordinatorRequestManager coordinatorRequestManager = null;
CommitRequestManager commitRequestManager = null;

1
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java

@ -43,6 +43,7 @@ import java.util.concurrent.LinkedBlockingQueue; @@ -43,6 +43,7 @@ import java.util.concurrent.LinkedBlockingQueue;
* {@code BackgroundEvent} from the {@link DefaultBackgroundThread}.
*/
public class DefaultEventHandler implements EventHandler {
private final BlockingQueue<ApplicationEvent> applicationEventQueue;
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final DefaultBackgroundThread backgroundThread;

149
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java

@ -0,0 +1,149 @@ @@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.io.Closeable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;
/**
* {@code FetchBuffer} buffers up {@link CompletedFetch the results} from the broker responses as they are received.
* It is essentially a wrapper around a {@link java.util.Queue} of {@link CompletedFetch}. There is at most one
* {@link CompletedFetch} per partition in the queue.
*
* <p/>
*
* <em>Note</em>: this class is not thread-safe and is intended to only be used from a single thread.
*/
public class FetchBuffer implements Closeable {
private final Logger log;
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
private CompletedFetch nextInLineFetch;
public FetchBuffer(final LogContext logContext) {
this.log = logContext.logger(FetchBuffer.class);
this.completedFetches = new ConcurrentLinkedQueue<>();
}
/**
* Returns {@code true} if there are no completed fetches pending to return to the user.
*
* @return {@code true} if the buffer is empty, {@code false} otherwise
*/
boolean isEmpty() {
return completedFetches.isEmpty();
}
/**
* Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has
* visibility for testing.
*
* @return {@code true} if there are completed fetches that match the {@link Predicate}, {@code false} otherwise
*/
boolean hasCompletedFetches(Predicate<CompletedFetch> predicate) {
return completedFetches.stream().anyMatch(predicate);
}
void add(CompletedFetch completedFetch) {
completedFetches.add(completedFetch);
}
void addAll(Collection<CompletedFetch> completedFetches) {
this.completedFetches.addAll(completedFetches);
}
CompletedFetch nextInLineFetch() {
return nextInLineFetch;
}
void setNextInLineFetch(CompletedFetch completedFetch) {
this.nextInLineFetch = completedFetch;
}
CompletedFetch peek() {
return completedFetches.peek();
}
CompletedFetch poll() {
return completedFetches.poll();
}
/**
* Updates the buffer to retain only the fetch data that corresponds to the given partitions. Any previously
* {@link CompletedFetch fetched data} is removed if its partition is not in the given set of partitions.
*
* @param partitions {@link Set} of {@link TopicPartition}s for which any buffered data should be kept
*/
void retainAll(final Set<TopicPartition> partitions) {
completedFetches.removeIf(cf -> maybeDrain(partitions, cf));
if (maybeDrain(partitions, nextInLineFetch))
nextInLineFetch = null;
}
private boolean maybeDrain(final Set<TopicPartition> partitions, final CompletedFetch completedFetch) {
if (completedFetch != null && !partitions.contains(completedFetch.partition)) {
log.debug("Removing {} from buffered fetch data as it is not in the set of partitions to retain ({})", completedFetch.partition, partitions);
completedFetch.drain();
return true;
} else {
return false;
}
}
/**
* Return the set of {@link TopicPartition partitions} for which we have data in the buffer.
*
* @return {@link TopicPartition Partition} set
*/
Set<TopicPartition> bufferedPartitions() {
final Set<TopicPartition> partitions = new HashSet<>();
if (nextInLineFetch != null && !nextInLineFetch.isConsumed()) {
partitions.add(nextInLineFetch.partition);
}
completedFetches.forEach(cf -> partitions.add(cf.partition));
return partitions;
}
@Override
public void close() {
idempotentCloser.close(() -> {
log.debug("Closing the fetch buffer");
if (nextInLineFetch != null) {
nextInLineFetch.drain();
nextInLineFetch = null;
}
completedFetches.forEach(CompletedFetch::drain);
completedFetches.clear();
}, () -> log.warn("The fetch buffer was previously closed"));
}
}

372
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java

@ -0,0 +1,372 @@ @@ -0,0 +1,372 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import static org.apache.kafka.clients.consumer.internals.FetchUtils.requestMetadataUpdate;
/**
* {@code FetchCollector} operates at the {@link RecordBatch} level, as that is what is stored in the
* {@link FetchBuffer}. Each {@link org.apache.kafka.common.record.Record} in the {@link RecordBatch} is converted
* to a {@link ConsumerRecord} and added to the returned {@link Fetch}.
*
* @param <K> Record key type
* @param <V> Record value type
*/
public class FetchCollector<K, V> {
private final Logger log;
private final ConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final FetchConfig<K, V> fetchConfig;
private final FetchMetricsManager metricsManager;
private final Time time;
public FetchCollector(final LogContext logContext,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchConfig<K, V> fetchConfig,
final FetchMetricsManager metricsManager,
final Time time) {
this.log = logContext.logger(FetchCollector.class);
this.metadata = metadata;
this.subscriptions = subscriptions;
this.fetchConfig = fetchConfig;
this.metricsManager = metricsManager;
this.time = time;
}
/**
* Return the fetched {@link ConsumerRecord records}, empty the {@link FetchBuffer record buffer}, and
* update the consumed position.
*
* </p>
*
* NOTE: returning an {@link Fetch#empty() empty} fetch guarantees the consumed position is not updated.
*
* @param fetchBuffer {@link FetchBuffer} from which to retrieve the {@link ConsumerRecord records}
*
* @return A {@link Fetch} for the requested partitions
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
* @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse.
*/
public Fetch<K, V> collectFetch(final FetchBuffer fetchBuffer) {
final Fetch<K, V> fetch = Fetch.empty();
final Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
int recordsRemaining = fetchConfig.maxPollRecords;
try {
while (recordsRemaining > 0) {
final CompletedFetch nextInLineFetch = fetchBuffer.nextInLineFetch();
if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
final CompletedFetch completedFetch = fetchBuffer.peek();
if (completedFetch == null)
break;
if (!completedFetch.isInitialized()) {
try {
fetchBuffer.setNextInLineFetch(initialize(completedFetch));
} catch (Exception e) {
// Remove a completedFetch upon a parse with exception if (1) it contains no completedFetch, and
// (2) there are no fetched completedFetch with actual content preceding this exception.
// The first condition ensures that the completedFetches is not stuck with the same completedFetch
// in cases such as the TopicAuthorizationException, and the second condition ensures that no
// potential data loss due to an exception in a following record.
if (fetch.isEmpty() && FetchResponse.recordsOrFail(completedFetch.partitionData).sizeInBytes() == 0)
fetchBuffer.poll();
throw e;
}
} else {
fetchBuffer.setNextInLineFetch(completedFetch);
}
fetchBuffer.poll();
} else if (subscriptions.isPaused(nextInLineFetch.partition)) {
// when the partition is paused we add the records back to the completedFetches queue instead of draining
// them so that they can be returned on a subsequent poll if the partition is resumed at that time
log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition);
pausedCompletedFetches.add(nextInLineFetch);
fetchBuffer.setNextInLineFetch(null);
} else {
final Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch);
recordsRemaining -= nextFetch.numRecords();
fetch.add(nextFetch);
}
}
} catch (KafkaException e) {
if (fetch.isEmpty())
throw e;
} finally {
// add any polled completed fetches for paused partitions back to the completed fetches queue to be
// re-evaluated in the next poll
fetchBuffer.addAll(pausedCompletedFetches);
}
return fetch;
}
private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch) {
final TopicPartition tp = nextInLineFetch.partition;
if (!subscriptions.isAssigned(tp)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned", tp);
} else if (!subscriptions.isFetchable(tp)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's
// poll call or if the offset is being reset
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", tp);
} else {
SubscriptionState.FetchPosition position = subscriptions.position(tp);
if (position == null)
throw new IllegalStateException("Missing position for fetchable partition " + tp);
if (nextInLineFetch.nextFetchOffset() == position.offset) {
List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig, fetchConfig.maxPollRecords);
log.trace("Returning {} fetched records at offset {} for assigned partition {}",
partRecords.size(), position, tp);
boolean positionAdvanced = false;
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
position.currentLeader);
log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
position, nextPosition, tp, partRecords.size());
subscriptions.position(tp, nextPosition);
positionAdvanced = true;
}
Long partitionLag = subscriptions.partitionLag(tp, fetchConfig.isolationLevel);
if (partitionLag != null)
metricsManager.recordPartitionLag(tp, partitionLag);
Long lead = subscriptions.partitionLead(tp);
if (lead != null) {
metricsManager.recordPartitionLead(tp, lead);
}
return Fetch.forPartition(tp, partRecords, positionAdvanced);
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
tp, nextInLineFetch.nextFetchOffset(), position);
}
}
log.trace("Draining fetched records for partition {}", tp);
nextInLineFetch.drain();
return Fetch.empty();
}
/**
* Initialize a CompletedFetch object.
*/
protected CompletedFetch initialize(final CompletedFetch completedFetch) {
final TopicPartition tp = completedFetch.partition;
final Errors error = Errors.forCode(completedFetch.partitionData.errorCode());
boolean recordMetrics = true;
try {
if (!subscriptions.hasValidPosition(tp)) {
// this can happen when a rebalance happened while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
return null;
} else if (error == Errors.NONE) {
final CompletedFetch ret = handleInitializeSuccess(completedFetch);
recordMetrics = ret == null;
return ret;
} else {
handleInitializeErrors(completedFetch, error);
return null;
}
} finally {
if (recordMetrics) {
completedFetch.recordAggregatedMetrics(0, 0);
}
if (error != Errors.NONE)
// we move the partition to the end if there was an error. This way, it's more likely that partitions for
// the same topic can remain together (allowing for more efficient serialization).
subscriptions.movePartitionToEnd(tp);
}
}
private CompletedFetch handleInitializeSuccess(final CompletedFetch completedFetch) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
SubscriptionState.FetchPosition position = subscriptions.position(tp);
if (position == null || position.offset != fetchOffset) {
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
final FetchResponseData.PartitionData partition = completedFetch.partitionData;
log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
FetchResponse.recordsSize(partition), tp, position);
Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator();
if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
if (completedFetch.requestVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
recordTooLargePartitions + " whose size is larger than the fetch size " + fetchConfig.fetchSize +
" and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
"newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
recordTooLargePartitions);
} else {
// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
fetchOffset + ". Received a non-empty fetch response from the server, but no " +
"complete records were found.");
}
}
if (partition.highWatermark() >= 0) {
log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark());
subscriptions.updateHighWatermark(tp, partition.highWatermark());
}
if (partition.logStartOffset() >= 0) {
log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset());
subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
}
if (partition.lastStableOffset() >= 0) {
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset());
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
}
if (FetchResponse.isPreferredReplica(partition)) {
subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica(), () -> {
long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs();
log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}",
tp, partition.preferredReadReplica(), expireTimeMs);
return expireTimeMs;
});
}
completedFetch.setInitialized();
return completedFetch;
}
private void handleInitializeErrors(final CompletedFetch completedFetch, final Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();
if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.FENCED_LEADER_EPOCH ||
error == Errors.OFFSET_NOT_AVAILABLE) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
requestMetadataUpdate(metadata, subscriptions, tp);
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
} else if (error == Errors.UNKNOWN_TOPIC_ID) {
log.warn("Received unknown topic ID error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
} else if (error == Errors.INCONSISTENT_TOPIC_ID) {
log.warn("Received inconsistent topic ID error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
if (!clearedReplicaId.isPresent()) {
// If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
SubscriptionState.FetchPosition position = subscriptions.position(tp);
if (position == null || fetchOffset != position.offset) {
log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
"does not match the current offset {}", tp, fetchOffset, position);
} else {
String errorMessage = "Fetch position " + position + " is out of range for partition " + tp;
if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("{}, resetting offset", errorMessage);
subscriptions.requestOffsetReset(tp);
} else {
log.info("{}, raising error to the application since no reset policy is configured", errorMessage);
throw new OffsetOutOfRangeException(errorMessage,
Collections.singletonMap(tp, position.offset));
}
}
} else {
log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
clearedReplicaId.get(), tp, error, fetchOffset);
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
//we log the actual partition and not just the topic to help with ACL propagation issues in large clusters
log.warn("Not authorized to read from partition {}.", tp);
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
log.debug("Received unknown leader epoch error in fetch for partition {}", tp);
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown server error while fetching offset {} for topic-partition {}",
fetchOffset, tp);
} else if (error == Errors.CORRUPT_MESSAGE) {
throw new KafkaException("Encountered corrupt message when fetching offset "
+ fetchOffset
+ " for topic-partition "
+ tp);
} else {
throw new IllegalStateException("Unexpected error code "
+ error.code()
+ " while fetching at offset "
+ fetchOffset
+ " from topic-partition " + tp);
}
}
}

53
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java

@ -0,0 +1,53 @@ @@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.TopicPartition;
/**
* {@code FetchUtils} provides a place for disparate parts of the fetch logic to live.
*/
public class FetchUtils {
/**
* Performs two combined actions based on the state related to the {@link TopicPartition}:
*
* <ol>
* <li>
* Invokes {@link ConsumerMetadata#requestUpdate(boolean)} to signal that the metadata is incorrect and
* needs to be updated
* </li>
* <li>
* Invokes {@link SubscriptionState#clearPreferredReadReplica(TopicPartition)} to clear out any read replica
* information that may be present.
* </li>
* </ol>
*
* This utility method should be invoked if the client detects (or is told by a node in the broker) that an
* attempt was made to fetch from a node that isn't the leader or preferred replica.
*
* @param metadata {@link ConsumerMetadata} for which to request an update
* @param subscriptions {@link SubscriptionState} to clear any internal read replica node
* @param topicPartition {@link TopicPartition} for which this state change is related
*/
static void requestMetadataUpdate(final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final TopicPartition topicPartition) {
metadata.requestUpdate(false);
subscriptions.clearPreferredReadReplica(topicPartition);
}
}

35
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -19,14 +19,14 @@ package org.apache.kafka.clients.consumer.internals; @@ -19,14 +19,14 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class manages the fetching process with the brokers.
@ -49,8 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean; @@ -49,8 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class Fetcher<K, V> extends AbstractFetch<K, V> {
private final Logger log;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final FetchCollector<K, V> fetchCollector;
public Fetcher(LogContext logContext,
ConsumerNetworkClient client,
@ -60,7 +59,16 @@ public class Fetcher<K, V> extends AbstractFetch<K, V> { @@ -60,7 +59,16 @@ public class Fetcher<K, V> extends AbstractFetch<K, V> {
FetchMetricsManager metricsManager,
Time time) {
super(logContext, client, metadata, subscriptions, fetchConfig, metricsManager, time);
this.log = logContext.logger(Fetcher.class);
this.fetchCollector = new FetchCollector<>(logContext,
metadata,
subscriptions,
fetchConfig,
metricsManager,
time);
}
public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {
fetchBuffer.retainAll(new HashSet<>(assignedPartitions));
}
/**
@ -98,16 +106,7 @@ public class Fetcher<K, V> extends AbstractFetch<K, V> { @@ -98,16 +106,7 @@ public class Fetcher<K, V> extends AbstractFetch<K, V> {
return fetchRequestMap.size();
}
public void close(final Timer timer) {
if (!isClosed.compareAndSet(false, true)) {
log.info("Fetcher {} is already closed.", this);
return;
}
// Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence,
// it is necessary to acquire a lock on the fetcher instance before modifying the states.
synchronized (this) {
super.close(timer);
}
public Fetch<K, V> collectFetch() {
return fetchCollector.collectFetch(fetchBuffer);
}
}
}

9
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java

@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;
@ -34,11 +35,13 @@ import org.slf4j.Logger; @@ -34,11 +35,13 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
@ -51,6 +54,7 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -51,6 +54,7 @@ public class NetworkClientDelegate implements AutoCloseable {
private final int requestTimeoutMs;
private final Queue<UnsentRequest> unsentRequests;
private final long retryBackoffMs;
private final Set<Node> tryConnectNodes;
public NetworkClientDelegate(
final Time time,
@ -63,6 +67,11 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -63,6 +67,11 @@ public class NetworkClientDelegate implements AutoCloseable {
this.unsentRequests = new ArrayDeque<>();
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.tryConnectNodes = new HashSet<>();
}
public void tryConnect(Node node) {
NetworkClientUtils.tryConnect(client, node, time);
}
/**

5
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java

@ -82,6 +82,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -82,6 +82,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
private final long requestTimeoutMs;
private final Time time;
private final ApiVersions apiVersions;
private final NetworkClientDelegate networkClientDelegate;
public OffsetsRequestManager(final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
@ -90,12 +91,14 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -90,12 +91,14 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
final long retryBackoffMs,
final long requestTimeoutMs,
final ApiVersions apiVersions,
final NetworkClientDelegate networkClientDelegate,
final LogContext logContext) {
requireNonNull(subscriptionState);
requireNonNull(metadata);
requireNonNull(isolationLevel);
requireNonNull(time);
requireNonNull(apiVersions);
requireNonNull(networkClientDelegate);
requireNonNull(logContext);
this.metadata = metadata;
@ -107,6 +110,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -107,6 +110,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
this.time = time;
this.requestTimeoutMs = requestTimeoutMs;
this.apiVersions = apiVersions;
this.networkClientDelegate = networkClientDelegate;
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState,
time, retryBackoffMs, apiVersions);
// Register the cluster metadata update callback. Note this only relies on the
@ -429,6 +433,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -429,6 +433,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
if (nodeApiVersions == null) {
networkClientDelegate.tryConnect(node);
return;
}

11
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java

@ -364,17 +364,8 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -364,17 +364,8 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions);
wakeupTrigger.setActiveTask(event.future());
eventHandler.add(event);
try {
return event.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptException(e);
} catch (TimeoutException e) {
throw new org.apache.kafka.common.errors.TimeoutException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof WakeupException)
throw new WakeupException();
throw new KafkaException(e);
return eventHandler.addAndGet(event, time.timer(timeout));
} finally {
wakeupTrigger.clearActiveTask();
}

44
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java

@ -16,23 +16,51 @@ @@ -16,23 +16,51 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import java.util.Objects;
/**
* This is the abstract definition of the events created by the KafkaConsumer API
*/
abstract public class ApplicationEvent {
public final Type type;
public abstract class ApplicationEvent {
public enum Type {
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
}
private final Type type;
protected ApplicationEvent(Type type) {
this.type = type;
this.type = Objects.requireNonNull(type);
}
public Type type() {
return type;
}
@Override
public String toString() {
return type + " ApplicationEvent";
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ApplicationEvent that = (ApplicationEvent) o;
return type == that.type;
}
public enum Type {
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS,
@Override
public int hashCode() {
return type.hashCode();
}
protected String toStringBase() {
return "type=" + type;
}
@Override
public String toString() {
return "ApplicationEvent{" +
toStringBase() +
'}';
}
}

11
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java

@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals.events; @@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@ -47,7 +46,7 @@ public class ApplicationEventProcessor { @@ -47,7 +46,7 @@ public class ApplicationEventProcessor {
public boolean process(final ApplicationEvent event) {
Objects.requireNonNull(event);
switch (event.type) {
switch (event.type()) {
case NOOP:
return process((NoopApplicationEvent) event);
case COMMIT:
@ -78,7 +77,7 @@ public class ApplicationEventProcessor { @@ -78,7 +77,7 @@ public class ApplicationEventProcessor {
* @param event a {@link NoopApplicationEvent}
*/
private boolean process(final NoopApplicationEvent event) {
return backgroundEventQueue.add(new NoopBackgroundEvent(event.message));
return backgroundEventQueue.add(new NoopBackgroundEvent(event.message()));
}
private boolean process(final PollApplicationEvent event) {
@ -87,7 +86,7 @@ public class ApplicationEventProcessor { @@ -87,7 +86,7 @@ public class ApplicationEventProcessor {
}
CommitRequestManager manager = requestManagers.commitRequestManager.get();
manager.updateAutoCommitTimer(event.pollTimeMs);
manager.updateAutoCommitTimer(event.pollTimeMs());
return true;
}
@ -132,8 +131,8 @@ public class ApplicationEventProcessor { @@ -132,8 +131,8 @@ public class ApplicationEventProcessor {
return false;
}
CommitRequestManager manager = requestManagers.commitRequestManager.get();
manager.updateAutoCommitTimer(event.currentTimeMs);
manager.maybeAutoCommit(event.offsets);
manager.updateAutoCommitTimer(event.currentTimeMs());
manager.maybeAutoCommit(event.offsets());
return true;
}

48
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java

@ -19,15 +19,55 @@ package org.apache.kafka.clients.consumer.internals.events; @@ -19,15 +19,55 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
public class AssignmentChangeApplicationEvent extends ApplicationEvent {
final Map<TopicPartition, OffsetAndMetadata> offsets;
final long currentTimeMs;
public AssignmentChangeApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final long currentTimeMs) {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final long currentTimeMs;
public AssignmentChangeApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets,
final long currentTimeMs) {
super(Type.ASSIGNMENT_CHANGE);
this.offsets = offsets;
this.offsets = Collections.unmodifiableMap(offsets);
this.currentTimeMs = currentTimeMs;
}
public Map<TopicPartition, OffsetAndMetadata> offsets() {
return offsets;
}
public long currentTimeMs() {
return currentTimeMs;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
AssignmentChangeApplicationEvent that = (AssignmentChangeApplicationEvent) o;
if (currentTimeMs != that.currentTimeMs) return false;
return offsets.equals(that.offsets);
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + offsets.hashCode();
result = 31 * result + (int) (currentTimeMs ^ (currentTimeMs >>> 32));
return result;
}
@Override
public String toString() {
return "AssignmentChangeApplicationEvent{" +
toStringBase() +
", offsets=" + offsets +
", currentTimeMs=" + currentTimeMs +
'}';
}
}

46
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java

@ -16,16 +16,50 @@ @@ -16,16 +16,50 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import java.util.Objects;
/**
* This is the abstract definition of the events created by the background thread.
*/
abstract public class BackgroundEvent {
public final EventType type;
public abstract class BackgroundEvent {
public BackgroundEvent(EventType type) {
this.type = type;
}
public enum EventType {
public enum Type {
NOOP, ERROR,
}
protected final Type type;
public BackgroundEvent(Type type) {
this.type = Objects.requireNonNull(type);
}
public Type type() {
return type;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BackgroundEvent that = (BackgroundEvent) o;
return type == that.type;
}
@Override
public int hashCode() {
return type.hashCode();
}
protected String toStringBase() {
return "type=" + type;
}
@Override
public String toString() {
return "BackgroundEvent{" +
toStringBase() +
'}';
}
}

54
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java

@ -19,46 +19,50 @@ package org.apache.kafka.clients.consumer.internals.events; @@ -19,46 +19,50 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class CommitApplicationEvent extends ApplicationEvent {
final private CompletableFuture<Void> future;
final private Map<TopicPartition, OffsetAndMetadata> offsets;
public class CommitApplicationEvent extends CompletableApplicationEvent<Void> {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets) {
super(Type.COMMIT);
this.offsets = offsets;
Optional<Exception> exception = isValid(offsets);
if (exception.isPresent()) {
throw new RuntimeException(exception.get());
this.offsets = Collections.unmodifiableMap(offsets);
for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
if (offsetAndMetadata.offset() < 0) {
throw new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset());
}
}
this.future = new CompletableFuture<>();
}
public CompletableFuture<Void> future() {
return future;
}
public Map<TopicPartition, OffsetAndMetadata> offsets() {
return offsets;
}
private Optional<Exception> isValid(final Map<TopicPartition, OffsetAndMetadata> offsets) {
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata.offset() < 0) {
return Optional.of(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
}
}
return Optional.empty();
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
CommitApplicationEvent that = (CommitApplicationEvent) o;
return offsets.equals(that.offsets);
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + offsets.hashCode();
return result;
}
@Override
public String toString() {
return "CommitApplicationEvent("
+ "offsets=" + offsets + ")";
return "CommitApplicationEvent{" +
toStringBase() +
", offsets=" + offsets +
'}';
}
}

31
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java

@ -16,14 +16,10 @@ @@ -16,14 +16,10 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.common.utils.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* Application event with a result in the form of a future, that can be retrieved within a
@ -33,7 +29,7 @@ import java.util.concurrent.TimeUnit; @@ -33,7 +29,7 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class CompletableApplicationEvent<T> extends ApplicationEvent {
protected final CompletableFuture<T> future;
private final CompletableFuture<T> future;
protected CompletableApplicationEvent(Type type) {
super(type);
@ -45,20 +41,7 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent { @@ -45,20 +41,7 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent {
}
public T get(Timer timer) {
try {
return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof KafkaException)
throw (KafkaException) t;
else
throw new KafkaException(t);
} catch (InterruptedException e) {
throw new InterruptException(e);
} catch (java.util.concurrent.TimeoutException e) {
throw new TimeoutException(e);
}
return ConsumerUtils.getResult(future, timer);
}
public void chain(final CompletableFuture<T> providedFuture) {
@ -89,11 +72,15 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent { @@ -89,11 +72,15 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent {
return result;
}
@Override
protected String toStringBase() {
return super.toStringBase() + ", future=" + future;
}
@Override
public String toString() {
return getClass().getSimpleName() + "{" +
"future=" + future +
", type=" + type +
toStringBase() +
'}';
}
}

41
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java

@ -17,10 +17,41 @@ @@ -17,10 +17,41 @@
package org.apache.kafka.clients.consumer.internals.events;
public class ErrorBackgroundEvent extends BackgroundEvent {
private final Throwable exception;
public ErrorBackgroundEvent(Throwable e) {
super(EventType.ERROR);
exception = e;
private final Throwable error;
public ErrorBackgroundEvent(Throwable error) {
super(Type.ERROR);
this.error = error;
}
public Throwable error() {
return error;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ErrorBackgroundEvent that = (ErrorBackgroundEvent) o;
return error.equals(that.error);
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + error.hashCode();
return result;
}
@Override
public String toString() {
return "ErrorBackgroundEvent{" +
toStringBase() +
", error=" + error +
'}';
}
}
}

3
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java

@ -86,7 +86,8 @@ public class ListOffsetsApplicationEvent extends CompletableApplicationEvent<Map @@ -86,7 +86,8 @@ public class ListOffsetsApplicationEvent extends CompletableApplicationEvent<Map
@Override
public String toString() {
return getClass().getSimpleName() + " {" +
"timestampsToSearch=" + timestampsToSearch + ", " +
toStringBase() +
", timestampsToSearch=" + timestampsToSearch + ", " +
"requireTimestamps=" + requireTimestamps + '}';
}

7
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java

@ -21,4 +21,11 @@ public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent { @@ -21,4 +21,11 @@ public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
public NewTopicsMetadataUpdateRequestEvent() {
super(Type.METADATA_UPDATE);
}
@Override
public String toString() {
return "NewTopicsMetadataUpdateRequestEvent{" +
toStringBase() +
'}';
}
}

38
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopApplicationEvent.java

@ -16,19 +16,47 @@ @@ -16,19 +16,47 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import java.util.Objects;
/**
* The event is NoOp. This is intentionally left here for demonstration purpose.
* The event is a no-op, but is intentionally left here for demonstration and test purposes.
*/
public class NoopApplicationEvent extends ApplicationEvent {
public final String message;
private final String message;
public NoopApplicationEvent(final String message) {
super(Type.NOOP);
this.message = message;
this.message = Objects.requireNonNull(message);
}
public String message() {
return message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
NoopApplicationEvent that = (NoopApplicationEvent) o;
return message.equals(that.message);
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + message.hashCode();
return result;
}
@Override
public String toString() {
return getClass() + "_" + this.message;
return "NoopApplicationEvent{" +
toStringBase() +
",message='" + message + '\'' +
'}';
}
}
}

42
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoopBackgroundEvent.java → clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NoopBackgroundEvent.java

@ -14,23 +14,49 @@ @@ -14,23 +14,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import java.util.Objects;
/**
* Noop event. Intentionally left it here for demonstration purpose.
* No-op event. Intentionally left it here for demonstration purpose.
*/
public class NoopBackgroundEvent extends BackgroundEvent {
public final String message;
private final String message;
public NoopBackgroundEvent(final String message) {
super(EventType.NOOP);
this.message = message;
super(Type.NOOP);
this.message = Objects.requireNonNull(message);
}
public String message() {
return message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
NoopBackgroundEvent that = (NoopBackgroundEvent) o;
return message.equals(that.message);
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + message.hashCode();
return result;
}
@Override
public String toString() {
return getClass() + "_" + this.message;
return "NoopBackgroundEvent{" +
toStringBase() +
", message='" + message + '\'' +
'}';
}
}
}

6
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java

@ -24,6 +24,7 @@ import java.util.Map; @@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
private final Set<TopicPartition> partitions;
public OffsetFetchApplicationEvent(final Set<TopicPartition> partitions) {
@ -56,9 +57,8 @@ public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map @@ -56,9 +57,8 @@ public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map
@Override
public String toString() {
return getClass().getSimpleName() + "{" +
"partitions=" + partitions +
", future=" + future +
", type=" + type +
toStringBase() +
", partitions=" + partitions +
'}';
}
}

39
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java

@ -17,10 +17,41 @@ @@ -17,10 +17,41 @@
package org.apache.kafka.clients.consumer.internals.events;
public class PollApplicationEvent extends ApplicationEvent {
public final long pollTimeMs;
protected PollApplicationEvent(final long currentTimeMs) {
private final long pollTimeMs;
public PollApplicationEvent(final long pollTimeMs) {
super(Type.POLL);
this.pollTimeMs = currentTimeMs;
this.pollTimeMs = pollTimeMs;
}
public long pollTimeMs() {
return pollTimeMs;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
PollApplicationEvent that = (PollApplicationEvent) o;
return pollTimeMs == that.pollTimeMs;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (int) (pollTimeMs ^ (pollTimeMs >>> 32));
return result;
}
@Override
public String toString() {
return "PollApplicationEvent{" +
toStringBase() +
", pollTimeMs=" + pollTimeMs +
'}';
}
}
}

174
clients/src/main/java/org/apache/kafka/common/internals/IdempotentCloser.java

@ -0,0 +1,174 @@ @@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* {@code IdempotentCloser} encapsulates some basic logic to ensure that a given resource is only closed once.
* The underlying mechanism for ensuring that the close only happens once <em>and</em> is thread safe
* is via the {@link AtomicBoolean#compareAndSet(boolean, boolean)}. Users can provide callbacks (via optional
* {@link Runnable}s) for either the <em>initial</em> close and/or any <em>subsequent</em> closes.
*
* <p/>
*
* Here's an example:
*
* <pre>
*
* public class MyDataFile implements Closeable {
*
* private final IdempotentCloser closer = new IdempotentCloser();
*
* private final File file;
*
* . . .
*
* public boolean write() {
* closer.assertOpen(() -> String.format("Data file %s already closed!", file));
* writeToFile();
* }
*
* public boolean isClosed() {
* return closer.isClosed();
* }
*
* &#064;Override
* public void close() {
* Runnable onInitialClose = () -> {
* cleanUpFile(file);
* log.debug("Data file {} closed", file);
* };
* Runnable onSubsequentClose = () -> {
* log.warn("Data file {} already closed!", file);
* };
* closer.close(onInitialClose, onSubsequentClose);
* }
* }
* </pre>
*/
public class IdempotentCloser implements AutoCloseable {
private final AtomicBoolean isClosed;
/**
* Creates an {@code IdempotentCloser} that is not yet closed.
*/
public IdempotentCloser() {
this(false);
}
/**
* Creates an {@code IdempotentCloser} with the given initial state.
*
* @param isClosed Initial value for underlying state
*/
public IdempotentCloser(boolean isClosed) {
this.isClosed = new AtomicBoolean(isClosed);
}
/**
* This method serves as an assert that the {@link IdempotentCloser} is still open. If it is open, this method
* simply returns. If it is closed, a new {@link IllegalStateException} will be thrown using the supplied message.
*
* @param message {@link Supplier} that supplies the message for the exception
*/
public void assertOpen(Supplier<String> message) {
if (isClosed.get())
throw new IllegalStateException(message.get());
}
/**
* This method serves as an assert that the {@link IdempotentCloser} is still open. If it is open, this method
* simply returns. If it is closed, a new {@link IllegalStateException} will be thrown using the given message.
*
* @param message Message to use for the exception
*/
public void assertOpen(String message) {
if (isClosed.get())
throw new IllegalStateException(message);
}
public boolean isClosed() {
return isClosed.get();
}
/**
* Closes the resource in a thread-safe manner.
*
* <p/>
*
* After the execution has completed, calls to {@link #isClosed()} will return {@code false} and calls to
* {@link #assertOpen(String)} and {@link #assertOpen(Supplier)}
* will throw an {@link IllegalStateException}.
*/
@Override
public void close() {
close(null, null);
}
/**
* Closes the resource in a thread-safe manner.
*
* <p/>
*
* After the execution has completed, calls to {@link #isClosed()} will return {@code false} and calls to
* {@link #assertOpen(String)} and {@link #assertOpen(Supplier)}
* will throw an {@link IllegalStateException}.
*
* @param onInitialClose Optional {@link Runnable} to execute when the resource is closed. Note that the
* object will still be considered closed even if an exception is thrown during the course
* of its execution; can be {@code null}
*/
public void close(final Runnable onInitialClose) {
close(onInitialClose, null);
}
/**
* Closes the resource in a thread-safe manner.
*
* <p/>
*
* After the execution has completed, calls to {@link #isClosed()} will return {@code false} and calls to
* {@link #assertOpen(String)} and {@link #assertOpen(Supplier)}
* will throw an {@link IllegalStateException}.
*
* @param onInitialClose Optional {@link Runnable} to execute when the resource is closed. Note that the
* object will still be considered closed even if an exception is thrown during the course
* of its execution; can be {@code null}
* @param onSubsequentClose Optional {@link Runnable} to execute if this resource was previously closed. Note that
* no state will be affected if an exception is thrown during its execution; can be
* {@code null}
*/
public void close(final Runnable onInitialClose, final Runnable onSubsequentClose) {
if (isClosed.compareAndSet(false, true)) {
if (onInitialClose != null)
onInitialClose.run();
} else {
if (onSubsequentClose != null)
onSubsequentClose.run();
}
}
@Override
public String toString() {
return "IdempotentCloser{" +
"isClosed=" + isClosed +
'}';
}
}

216
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java

@ -67,19 +67,23 @@ public class CompletedFetchTest { @@ -67,19 +67,23 @@ public class CompletedFetchTest {
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
.setRecords(newRecords(startingOffset, numRecords, fetchOffset));
CompletedFetch<String, String> completedFetch = newCompletedFetch(fetchOffset, partitionData);
FetchConfig<String, String> fetchConfig = newFetchConfig(new StringDeserializer(),
new StringDeserializer(),
IsolationLevel.READ_UNCOMMITTED,
true);
CompletedFetch completedFetch = newCompletedFetch(fetchOffset, partitionData);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, 10);
assertEquals(10, records.size());
ConsumerRecord<String, String> record = records.get(0);
assertEquals(10, record.offset());
records = completedFetch.fetchRecords(10);
records = completedFetch.fetchRecords(fetchConfig, 10);
assertEquals(1, records.size());
record = records.get(0);
assertEquals(20, record.offset());
records = completedFetch.fetchRecords(10);
records = completedFetch.fetchRecords(fetchConfig, 10);
assertEquals(0, records.size());
}
@ -92,21 +96,23 @@ public class CompletedFetchTest { @@ -92,21 +96,23 @@ public class CompletedFetchTest {
.setRecords(rawRecords)
.setAbortedTransactions(newAbortedTransactions());
CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_COMMITTED,
OffsetResetStrategy.NONE,
true,
0,
partitionData);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10);
assertEquals(0, records.size());
completedFetch = newCompletedFetch(IsolationLevel.READ_UNCOMMITTED,
OffsetResetStrategy.NONE,
true,
0,
partitionData);
records = completedFetch.fetchRecords(10);
assertEquals(numRecords, records.size());
try (final StringDeserializer deserializer = new StringDeserializer()) {
FetchConfig<String, String> fetchConfig = newFetchConfig(deserializer,
deserializer,
IsolationLevel.READ_COMMITTED,
true);
CompletedFetch completedFetch = newCompletedFetch(0, partitionData);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, 10);
assertEquals(0, records.size());
fetchConfig = newFetchConfig(deserializer,
deserializer,
IsolationLevel.READ_UNCOMMITTED,
true);
completedFetch = newCompletedFetch(0, partitionData);
records = completedFetch.fetchRecords(fetchConfig, 10);
assertEquals(numRecords, records.size());
}
}
@Test
@ -115,13 +121,15 @@ public class CompletedFetchTest { @@ -115,13 +121,15 @@ public class CompletedFetchTest {
Records rawRecords = newTranscactionalRecords(ControlRecordType.COMMIT, numRecords);
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
.setRecords(rawRecords);
CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_COMMITTED,
OffsetResetStrategy.NONE,
true,
0,
partitionData);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10);
assertEquals(10, records.size());
CompletedFetch completedFetch = newCompletedFetch(0, partitionData);
try (final StringDeserializer deserializer = new StringDeserializer()) {
FetchConfig<String, String> fetchConfig = newFetchConfig(deserializer,
deserializer,
IsolationLevel.READ_COMMITTED,
true);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, 10);
assertEquals(10, records.size());
}
}
@Test
@ -132,9 +140,13 @@ public class CompletedFetchTest { @@ -132,9 +140,13 @@ public class CompletedFetchTest {
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
.setRecords(newRecords(startingOffset, numRecords, fetchOffset));
CompletedFetch<String, String> completedFetch = newCompletedFetch(fetchOffset, partitionData);
CompletedFetch completedFetch = newCompletedFetch(fetchOffset, partitionData);
FetchConfig<String, String> fetchConfig = newFetchConfig(new StringDeserializer(),
new StringDeserializer(),
IsolationLevel.READ_UNCOMMITTED,
true);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(-10);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, -10);
assertEquals(0, records.size());
}
@ -146,82 +158,72 @@ public class CompletedFetchTest { @@ -146,82 +158,72 @@ public class CompletedFetchTest {
.setLastStableOffset(20)
.setLogStartOffset(0);
CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_UNCOMMITTED,
OffsetResetStrategy.NONE,
false,
1,
partitionData);
CompletedFetch completedFetch = newCompletedFetch(1, partitionData);
try (final StringDeserializer deserializer = new StringDeserializer()) {
FetchConfig<String, String> fetchConfig = newFetchConfig(deserializer,
deserializer,
IsolationLevel.READ_UNCOMMITTED,
true);
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10);
assertEquals(0, records.size());
List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(fetchConfig, 10);
assertEquals(0, records.size());
}
}
@Test
public void testCorruptedMessage() {
// Create one good record and then one "corrupted" record.
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0);
builder.append(new SimpleRecord(new UUIDSerializer().serialize(TOPIC_NAME, UUID.randomUUID())));
builder.append(0L, "key".getBytes(), "value".getBytes());
Records records = builder.build();
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
.setPartitionIndex(0)
.setHighWatermark(10)
.setLastStableOffset(20)
.setLogStartOffset(0)
.setRecords(records);
CompletedFetch<UUID, UUID> completedFetch = newCompletedFetch(new UUIDDeserializer(),
new UUIDDeserializer(),
IsolationLevel.READ_COMMITTED,
OffsetResetStrategy.NONE,
false,
0,
partitionData);
completedFetch.fetchRecords(10);
assertThrows(RecordDeserializationException.class, () -> completedFetch.fetchRecords(10));
}
private CompletedFetch<String, String> newCompletedFetch(long fetchOffset,
FetchResponseData.PartitionData partitionData) {
return newCompletedFetch(
IsolationLevel.READ_UNCOMMITTED,
OffsetResetStrategy.NONE,
true,
fetchOffset,
partitionData);
try (final MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0);
final UUIDSerializer serializer = new UUIDSerializer();
final UUIDDeserializer deserializer = new UUIDDeserializer()) {
builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
builder.append(0L, "key".getBytes(), "value".getBytes());
Records records = builder.build();
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
.setPartitionIndex(0)
.setHighWatermark(10)
.setLastStableOffset(20)
.setLogStartOffset(0)
.setRecords(records);
FetchConfig<UUID, UUID> fetchConfig = newFetchConfig(deserializer,
deserializer,
IsolationLevel.READ_COMMITTED,
false);
CompletedFetch completedFetch = newCompletedFetch(0, partitionData);
completedFetch.fetchRecords(fetchConfig, 10);
assertThrows(RecordDeserializationException.class,
() -> completedFetch.fetchRecords(fetchConfig, 10));
}
}
private CompletedFetch<String, String> newCompletedFetch(IsolationLevel isolationLevel,
OffsetResetStrategy offsetResetStrategy,
boolean checkCrcs,
long fetchOffset,
FetchResponseData.PartitionData partitionData) {
return newCompletedFetch(new StringDeserializer(),
new StringDeserializer(),
isolationLevel,
offsetResetStrategy,
checkCrcs,
fetchOffset,
partitionData);
}
private <K, V> CompletedFetch<K, V> newCompletedFetch(Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
IsolationLevel isolationLevel,
OffsetResetStrategy offsetResetStrategy,
boolean checkCrcs,
long fetchOffset,
FetchResponseData.PartitionData partitionData) {
private CompletedFetch newCompletedFetch(long fetchOffset,
FetchResponseData.PartitionData partitionData) {
LogContext logContext = new LogContext();
SubscriptionState subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
SubscriptionState subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.NONE);
FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry();
FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(), metricsRegistry);
FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metrics, Collections.singleton(TP));
FetchConfig<K, V> fetchConfig = new FetchConfig<>(
return new CompletedFetch(
logContext,
subscriptions,
BufferSupplier.create(),
TP,
partitionData,
metricAggregator,
fetchOffset,
ApiKeys.FETCH.latestVersion());
}
private static <K, V> FetchConfig<K, V> newFetchConfig(Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
IsolationLevel isolationLevel,
boolean checkCrcs) {
return new FetchConfig<>(
ConsumerConfig.DEFAULT_FETCH_MIN_BYTES,
ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS,
@ -232,29 +234,21 @@ public class CompletedFetchTest { @@ -232,29 +234,21 @@ public class CompletedFetchTest {
new Deserializers<>(keyDeserializer, valueDeserializer),
isolationLevel
);
return new CompletedFetch<>(
logContext,
subscriptions,
fetchConfig,
BufferSupplier.create(),
TP,
partitionData,
metricAggregator,
fetchOffset,
ApiKeys.FETCH.latestVersion());
}
private Records newRecords(long baseOffset, int count, long firstMessageId) {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset);
for (int i = 0; i < count; i++)
builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes());
return builder.build();
try (final MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset)) {
for (int i = 0; i < count; i++)
builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes());
return builder.build();
}
}
private Records newTranscactionalRecords(ControlRecordType controlRecordType, int numRecords) {
Time time = new MockTime();
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE,
CompressionType.NONE,
TimestampType.CREATE_TIME,
@ -264,12 +258,13 @@ public class CompletedFetchTest { @@ -264,12 +258,13 @@ public class CompletedFetchTest {
PRODUCER_EPOCH,
0,
true,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
for (int i = 0; i < numRecords; i++)
builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
for (int i = 0; i < numRecords; i++)
builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
builder.build();
}
builder.build();
writeTransactionMarker(buffer, controlRecordType, numRecords, time);
buffer.flip();
@ -295,5 +290,4 @@ public class CompletedFetchTest { @@ -295,5 +290,4 @@ public class CompletedFetchTest {
abortedTransaction.setProducerId(PRODUCER_ID);
return Collections.singletonList(abortedTransaction);
}
}

194
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java

@ -0,0 +1,194 @@ @@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This tests the {@link FetchBuffer} functionality in addition to what {@link FetcherTest} covers in its tests.
* One of the main concerns of these tests are that we correctly handle both places that data is held internally:
*
* <ol>
* <li>A special "next in line" buffer</li>
* <li>The remainder of the buffers in a queue</li>
* </ol>
*/
public class FetchBufferTest {
private final Time time = new MockTime(0, 0, 0);
private final TopicPartition topicAPartition0 = new TopicPartition("topic-a", 0);
private final TopicPartition topicAPartition1 = new TopicPartition("topic-a", 1);
private final TopicPartition topicAPartition2 = new TopicPartition("topic-a", 2);
private final Set<TopicPartition> allPartitions = partitions(topicAPartition0, topicAPartition1, topicAPartition2);
private LogContext logContext;
private SubscriptionState subscriptions;
private FetchMetricsManager metricsManager;
@BeforeEach
public void setup() {
logContext = new LogContext();
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
ConsumerConfig config = new ConsumerConfig(p);
subscriptions = createSubscriptionState(config, logContext);
Metrics metrics = createMetrics(config, time);
metricsManager = createFetchMetricsManager(metrics);
}
/**
* Verifies the basics: we can add buffered data to the queue, peek to view them, and poll to remove them.
*/
@Test
public void testBasicPeekAndPoll() {
try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
CompletedFetch completedFetch = completedFetch(topicAPartition0);
assertTrue(fetchBuffer.isEmpty());
fetchBuffer.add(completedFetch);
assertTrue(fetchBuffer.hasCompletedFetches(p -> true));
assertFalse(fetchBuffer.isEmpty());
assertNotNull(fetchBuffer.peek());
assertSame(completedFetch, fetchBuffer.peek());
assertSame(completedFetch, fetchBuffer.poll());
assertNull(fetchBuffer.peek());
}
}
/**
* Verifies {@link FetchBuffer#close()}} closes the buffered data for both the queue and the next-in-line buffer.
*/
@Test
public void testCloseClearsData() {
// We don't use the try-with-resources approach because we want to have access to the FetchBuffer after
// the try block so that we can run our asserts on the object.
FetchBuffer fetchBuffer = null;
try {
fetchBuffer = new FetchBuffer(logContext);
assertNull(fetchBuffer.nextInLineFetch());
assertTrue(fetchBuffer.isEmpty());
fetchBuffer.add(completedFetch(topicAPartition0));
assertFalse(fetchBuffer.isEmpty());
fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
assertNotNull(fetchBuffer.nextInLineFetch());
} finally {
if (fetchBuffer != null)
fetchBuffer.close();
}
assertNull(fetchBuffer.nextInLineFetch());
assertTrue(fetchBuffer.isEmpty());
}
/**
* Tests that the buffer returns partitions for both the queue and the next-in-line buffer.
*/
@Test
public void testBufferedPartitions() {
try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
fetchBuffer.add(completedFetch(topicAPartition1));
fetchBuffer.add(completedFetch(topicAPartition2));
assertEquals(allPartitions, fetchBuffer.bufferedPartitions());
fetchBuffer.setNextInLineFetch(null);
assertEquals(partitions(topicAPartition1, topicAPartition2), fetchBuffer.bufferedPartitions());
fetchBuffer.poll();
assertEquals(partitions(topicAPartition2), fetchBuffer.bufferedPartitions());
fetchBuffer.poll();
assertEquals(partitions(), fetchBuffer.bufferedPartitions());
}
}
/**
* Tests that the buffer manipulates partitions for both the queue and the next-in-line buffer.
*/
@Test
public void testAddAllAndRetainAll() {
try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
fetchBuffer.setNextInLineFetch(completedFetch(topicAPartition0));
fetchBuffer.addAll(Arrays.asList(completedFetch(topicAPartition1), completedFetch(topicAPartition2)));
assertEquals(allPartitions, fetchBuffer.bufferedPartitions());
fetchBuffer.retainAll(partitions(topicAPartition1, topicAPartition2));
assertEquals(partitions(topicAPartition1, topicAPartition2), fetchBuffer.bufferedPartitions());
fetchBuffer.retainAll(partitions(topicAPartition2));
assertEquals(partitions(topicAPartition2), fetchBuffer.bufferedPartitions());
fetchBuffer.retainAll(partitions());
assertEquals(partitions(), fetchBuffer.bufferedPartitions());
}
}
private CompletedFetch completedFetch(TopicPartition tp) {
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData();
FetchMetricsAggregator metricsAggregator = new FetchMetricsAggregator(metricsManager, allPartitions);
return new CompletedFetch(
logContext,
subscriptions,
BufferSupplier.create(),
tp,
partitionData,
metricsAggregator,
0L,
ApiKeys.FETCH.latestVersion());
}
/**
* This is a handy utility method for returning a set from a varargs array.
*/
private static Set<TopicPartition> partitions(TopicPartition... partitions) {
return new HashSet<>(Arrays.asList(partitions));
}
}

579
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java

@ -0,0 +1,579 @@ @@ -0,0 +1,579 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Stream;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchConfig;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This tests the {@link FetchCollector} functionality in addition to what {@link FetcherTest} tests during the course
* of its tests.
*/
public class FetchCollectorTest {
private final static int DEFAULT_RECORD_COUNT = 10;
private final static int DEFAULT_MAX_POLL_RECORDS = ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
private final Time time = new MockTime(0, 0, 0);
private final TopicPartition topicAPartition0 = new TopicPartition("topic-a", 0);
private final TopicPartition topicAPartition1 = new TopicPartition("topic-a", 1);
private final TopicPartition topicAPartition2 = new TopicPartition("topic-a", 2);
private final Set<TopicPartition> allPartitions = partitions(topicAPartition0, topicAPartition1, topicAPartition2);
private LogContext logContext;
private SubscriptionState subscriptions;
private FetchConfig<String, String> fetchConfig;
private FetchMetricsManager metricsManager;
private ConsumerMetadata metadata;
private FetchBuffer fetchBuffer;
private FetchCollector<String, String> fetchCollector;
private CompletedFetchBuilder completedFetchBuilder;
@Test
public void testFetchNormal() {
int recordCount = DEFAULT_MAX_POLL_RECORDS;
buildDependencies();
assignAndSeek(topicAPartition0);
CompletedFetch completedFetch = completedFetchBuilder
.recordCount(recordCount)
.build();
// Validate that the buffer is empty until after we add the fetch data.
assertTrue(fetchBuffer.isEmpty());
fetchBuffer.add(completedFetch);
assertFalse(fetchBuffer.isEmpty());
// Validate that the completed fetch isn't initialized just because we add it to the buffer.
assertFalse(completedFetch.isInitialized());
// Fetch the data and validate that we get all the records we want back.
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
assertFalse(fetch.isEmpty());
assertEquals(recordCount, fetch.numRecords());
// When we collected the data from the buffer, this will cause the completed fetch to get initialized.
assertTrue(completedFetch.isInitialized());
// However, even though we've collected the data, it isn't (completely) consumed yet.
assertFalse(completedFetch.isConsumed());
// The buffer is now considered "empty" because our queue is empty.
assertTrue(fetchBuffer.isEmpty());
assertNull(fetchBuffer.peek());
assertNull(fetchBuffer.poll());
// However, while the queue is "empty", the next-in-line fetch is actually still in the buffer.
assertNotNull(fetchBuffer.nextInLineFetch());
// Validate that the next fetch position has been updated to point to the record after our last fetched
// record.
SubscriptionState.FetchPosition position = subscriptions.position(topicAPartition0);
assertEquals(recordCount, position.offset);
// Now attempt to collect more records from the fetch buffer.
fetch = fetchCollector.collectFetch(fetchBuffer);
// The Fetch object is non-null, but it's empty.
assertEquals(0, fetch.numRecords());
assertTrue(fetch.isEmpty());
// However, once we read *past* the end of the records in the CompletedFetch, then we will call
// drain on it, and it will be considered all consumed.
assertTrue(completedFetch.isConsumed());
}
@Test
public void testFetchWithReadReplica() {
buildDependencies();
assignAndSeek(topicAPartition0);
// Set the preferred read replica and just to be safe, verify it was set.
int preferredReadReplicaId = 67;
subscriptions.updatePreferredReadReplica(topicAPartition0, preferredReadReplicaId, time::milliseconds);
assertNotNull(subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
assertEquals(Optional.of(preferredReadReplicaId), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
CompletedFetch completedFetch = completedFetchBuilder.build();
fetchBuffer.add(completedFetch);
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
// The Fetch and read replica settings should be empty.
assertEquals(DEFAULT_RECORD_COUNT, fetch.numRecords());
assertEquals(Optional.of(preferredReadReplicaId), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
}
@Test
public void testNoResultsIfInitializing() {
buildDependencies();
// Intentionally call assign (vs. assignAndSeek) so that we don't set the position. The SubscriptionState
// will consider the partition as in the SubscriptionState.FetchStates.INITIALIZED state.
assign(topicAPartition0);
// The position should thus be null and considered un-fetchable and invalid.
assertNull(subscriptions.position(topicAPartition0));
assertFalse(subscriptions.isFetchable(topicAPartition0));
assertFalse(subscriptions.hasValidPosition(topicAPartition0));
// Add some valid CompletedFetch records to the FetchBuffer queue and collect them into the Fetch.
CompletedFetch completedFetch = completedFetchBuilder.build();
fetchBuffer.add(completedFetch);
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
// Verify that no records are fetched for the partition as it did not have a valid position set.
assertEquals(0, fetch.numRecords());
}
@ParameterizedTest
@MethodSource("testErrorInInitializeSource")
public void testErrorInInitialize(int recordCount, RuntimeException expectedException) {
buildDependencies();
assignAndSeek(topicAPartition0);
// Create a FetchCollector that fails on CompletedFetch initialization.
fetchCollector = new FetchCollector<String, String>(logContext,
metadata,
subscriptions,
fetchConfig,
metricsManager,
time) {
@Override
protected CompletedFetch initialize(final CompletedFetch completedFetch) {
throw expectedException;
}
};
// Add the CompletedFetch to the FetchBuffer queue
CompletedFetch completedFetch = completedFetchBuilder
.recordCount(recordCount)
.build();
fetchBuffer.add(completedFetch);
// At first, the queue is populated
assertFalse(fetchBuffer.isEmpty());
// Now run our ill-fated collectFetch.
assertThrows(expectedException.getClass(), () -> fetchCollector.collectFetch(fetchBuffer));
// If the number of records in the CompletedFetch was 0, the call to FetchCollector.collectFetch() will
// remove it from the queue. If there are records in the CompletedFetch, FetchCollector.collectFetch will
// leave it on the queue.
assertEquals(recordCount == 0, fetchBuffer.isEmpty());
}
@Test
public void testFetchingPausedPartitionsYieldsNoRecords() {
buildDependencies();
assignAndSeek(topicAPartition0);
// The partition should not be 'paused' in the SubscriptionState until we explicitly tell it to.
assertFalse(subscriptions.isPaused(topicAPartition0));
subscriptions.pause(topicAPartition0);
assertTrue(subscriptions.isPaused(topicAPartition0));
CompletedFetch completedFetch = completedFetchBuilder.build();
// Set the CompletedFetch to the next-in-line fetch, *not* the queue.
fetchBuffer.setNextInLineFetch(completedFetch);
// The next-in-line CompletedFetch should reference the same object that was just created
assertSame(fetchBuffer.nextInLineFetch(), completedFetch);
// The FetchBuffer queue should be empty as the CompletedFetch was added to the next-in-line.
// CompletedFetch, not the queue.
assertTrue(fetchBuffer.isEmpty());
// Ensure that the partition for the next-in-line CompletedFetch is still 'paused'.
assertTrue(subscriptions.isPaused(completedFetch.partition));
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
// There should be no records in the Fetch as the partition being fetched is 'paused'.
assertEquals(0, fetch.numRecords());
// The FetchBuffer queue should not be empty; the CompletedFetch is added to the FetchBuffer queue by
// the FetchCollector when it detects a 'paused' partition.
assertFalse(fetchBuffer.isEmpty());
// The next-in-line CompletedFetch should be null; the CompletedFetch is added to the FetchBuffer
// queue by the FetchCollector when it detects a 'paused' partition.
assertNull(fetchBuffer.nextInLineFetch());
}
@ParameterizedTest
@MethodSource("testFetchWithMetadataRefreshErrorsSource")
public void testFetchWithMetadataRefreshErrors(final Errors error) {
buildDependencies();
assignAndSeek(topicAPartition0);
CompletedFetch completedFetch = completedFetchBuilder
.error(error)
.build();
fetchBuffer.add(completedFetch);
// Set the preferred read replica and just to be safe, verify it was set.
int preferredReadReplicaId = 5;
subscriptions.updatePreferredReadReplica(topicAPartition0, preferredReadReplicaId, time::milliseconds);
assertNotNull(subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
assertEquals(Optional.of(preferredReadReplicaId), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
// Fetch the data and validate that we get all the records we want back.
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
assertTrue(fetch.isEmpty());
assertTrue(metadata.updateRequested());
assertEquals(Optional.empty(), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
}
@Test
public void testFetchWithOffsetOutOfRange() {
buildDependencies();
assignAndSeek(topicAPartition0);
CompletedFetch completedFetch = completedFetchBuilder.build();
fetchBuffer.add(completedFetch);
// Fetch the data and validate that we get our first batch of records back.
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
assertFalse(fetch.isEmpty());
assertEquals(DEFAULT_RECORD_COUNT, fetch.numRecords());
// Try to fetch more data and validate that we get an empty Fetch back.
completedFetch = completedFetchBuilder
.fetchOffset(fetch.numRecords())
.error(Errors.OFFSET_OUT_OF_RANGE)
.build();
fetchBuffer.add(completedFetch);
fetch = fetchCollector.collectFetch(fetchBuffer);
assertTrue(fetch.isEmpty());
// Try to fetch more data and validate that we get an empty Fetch back.
completedFetch = completedFetchBuilder
.fetchOffset(fetch.numRecords())
.error(Errors.OFFSET_OUT_OF_RANGE)
.build();
fetchBuffer.add(completedFetch);
fetch = fetchCollector.collectFetch(fetchBuffer);
assertTrue(fetch.isEmpty());
}
@Test
public void testFetchWithOffsetOutOfRangeWithPreferredReadReplica() {
int records = 10;
buildDependencies(records);
assignAndSeek(topicAPartition0);
// Set the preferred read replica and just to be safe, verify it was set.
int preferredReadReplicaId = 67;
subscriptions.updatePreferredReadReplica(topicAPartition0, preferredReadReplicaId, time::milliseconds);
assertNotNull(subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
assertEquals(Optional.of(preferredReadReplicaId), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
CompletedFetch completedFetch = completedFetchBuilder
.error(Errors.OFFSET_OUT_OF_RANGE)
.build();
fetchBuffer.add(completedFetch);
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
// The Fetch and read replica settings should be empty.
assertTrue(fetch.isEmpty());
assertEquals(Optional.empty(), subscriptions.preferredReadReplica(topicAPartition0, time.milliseconds()));
}
@Test
public void testFetchWithTopicAuthorizationFailed() {
buildDependencies();
assignAndSeek(topicAPartition0);
// Try to data and validate that we get an empty Fetch back.
CompletedFetch completedFetch = completedFetchBuilder
.error(Errors.TOPIC_AUTHORIZATION_FAILED)
.build();
fetchBuffer.add(completedFetch);
assertThrows(TopicAuthorizationException.class, () -> fetchCollector.collectFetch(fetchBuffer));
}
@Test
public void testFetchWithUnknownLeaderEpoch() {
buildDependencies();
assignAndSeek(topicAPartition0);
// Try to data and validate that we get an empty Fetch back.
CompletedFetch completedFetch = completedFetchBuilder
.error(Errors.UNKNOWN_LEADER_EPOCH)
.build();
fetchBuffer.add(completedFetch);
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
assertTrue(fetch.isEmpty());
}
@Test
public void testFetchWithUnknownServerError() {
buildDependencies();
assignAndSeek(topicAPartition0);
// Try to data and validate that we get an empty Fetch back.
CompletedFetch completedFetch = completedFetchBuilder
.error(Errors.UNKNOWN_SERVER_ERROR)
.build();
fetchBuffer.add(completedFetch);
Fetch<String, String> fetch = fetchCollector.collectFetch(fetchBuffer);
assertTrue(fetch.isEmpty());
}
@Test
public void testFetchWithCorruptMessage() {
buildDependencies();
assignAndSeek(topicAPartition0);
// Try to data and validate that we get an empty Fetch back.
CompletedFetch completedFetch = completedFetchBuilder
.error(Errors.CORRUPT_MESSAGE)
.build();
fetchBuffer.add(completedFetch);
assertThrows(KafkaException.class, () -> fetchCollector.collectFetch(fetchBuffer));
}
@ParameterizedTest
@MethodSource("testFetchWithOtherErrorsSource")
public void testFetchWithOtherErrors(final Errors error) {
buildDependencies();
assignAndSeek(topicAPartition0);
CompletedFetch completedFetch = completedFetchBuilder
.error(error)
.build();
fetchBuffer.add(completedFetch);
assertThrows(IllegalStateException.class, () -> fetchCollector.collectFetch(fetchBuffer));
}
/**
* This is a handy utility method for returning a set from a varargs array.
*/
private static Set<TopicPartition> partitions(TopicPartition... partitions) {
return new HashSet<>(Arrays.asList(partitions));
}
private void buildDependencies() {
buildDependencies(DEFAULT_MAX_POLL_RECORDS);
}
private void buildDependencies(int maxPollRecords) {
logContext = new LogContext();
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(maxPollRecords));
ConsumerConfig config = new ConsumerConfig(p);
Deserializers<String, String> deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer());
subscriptions = createSubscriptionState(config, logContext);
fetchConfig = createFetchConfig(config, deserializers);
Metrics metrics = createMetrics(config, time);
metricsManager = createFetchMetricsManager(metrics);
metadata = new ConsumerMetadata(
0,
1000,
10000,
false,
false,
subscriptions,
logContext,
new ClusterResourceListeners());
fetchCollector = new FetchCollector<>(
logContext,
metadata,
subscriptions,
fetchConfig,
metricsManager,
time);
fetchBuffer = new FetchBuffer(logContext);
completedFetchBuilder = new CompletedFetchBuilder();
}
private void assign(TopicPartition... partitions) {
subscriptions.assignFromUser(partitions(partitions));
}
private void assignAndSeek(TopicPartition tp) {
assign(tp);
subscriptions.seek(tp, 0);
}
/**
* Supplies the {@link Arguments} to {@link #testFetchWithMetadataRefreshErrors(Errors)}.
*/
private static Stream<Arguments> testFetchWithMetadataRefreshErrorsSource() {
List<Errors> errors = Arrays.asList(
Errors.NOT_LEADER_OR_FOLLOWER,
Errors.REPLICA_NOT_AVAILABLE,
Errors.KAFKA_STORAGE_ERROR,
Errors.FENCED_LEADER_EPOCH,
Errors.OFFSET_NOT_AVAILABLE,
Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_ID,
Errors.INCONSISTENT_TOPIC_ID
);
return errors.stream().map(Arguments::of);
}
/**
* Supplies the {@link Arguments} to {@link #testFetchWithOtherErrors(Errors)}.
*/
private static Stream<Arguments> testFetchWithOtherErrorsSource() {
List<Errors> errors = new ArrayList<>(Arrays.asList(Errors.values()));
errors.removeAll(Arrays.asList(
Errors.NONE,
Errors.NOT_LEADER_OR_FOLLOWER,
Errors.REPLICA_NOT_AVAILABLE,
Errors.KAFKA_STORAGE_ERROR,
Errors.FENCED_LEADER_EPOCH,
Errors.OFFSET_NOT_AVAILABLE,
Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_ID,
Errors.INCONSISTENT_TOPIC_ID,
Errors.OFFSET_OUT_OF_RANGE,
Errors.TOPIC_AUTHORIZATION_FAILED,
Errors.UNKNOWN_LEADER_EPOCH,
Errors.UNKNOWN_SERVER_ERROR,
Errors.CORRUPT_MESSAGE
));
return errors.stream().map(Arguments::of);
}
/**
* Supplies the {@link Arguments} to {@link #testErrorInInitialize(int, RuntimeException)}.
*/
private static Stream<Arguments> testErrorInInitializeSource() {
return Stream.of(
Arguments.of(10, new RuntimeException()),
Arguments.of(0, new RuntimeException()),
Arguments.of(10, new KafkaException()),
Arguments.of(0, new KafkaException())
);
}
private class CompletedFetchBuilder {
private long fetchOffset = 0;
private int recordCount = DEFAULT_RECORD_COUNT;
private Errors error = null;
private CompletedFetchBuilder fetchOffset(long fetchOffset) {
this.fetchOffset = fetchOffset;
return this;
}
private CompletedFetchBuilder recordCount(int recordCount) {
this.recordCount = recordCount;
return this;
}
private CompletedFetchBuilder error(Errors error) {
this.error = error;
return this;
}
private CompletedFetch build() {
Records records;
ByteBuffer allocate = ByteBuffer.allocate(1024);
try (MemoryRecordsBuilder builder = MemoryRecords.builder(allocate,
CompressionType.NONE,
TimestampType.CREATE_TIME,
0)) {
for (int i = 0; i < recordCount; i++)
builder.append(0L, "key".getBytes(), ("value-" + i).getBytes());
records = builder.build();
}
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
.setPartitionIndex(topicAPartition0.partition())
.setHighWatermark(1000)
.setRecords(records);
if (error != null)
partitionData.setErrorCode(error.code());
FetchMetricsAggregator metricsAggregator = new FetchMetricsAggregator(metricsManager, allPartitions);
return new CompletedFetch(
logContext,
subscriptions,
BufferSupplier.create(),
topicAPartition0,
partitionData,
metricsAggregator,
fetchOffset,
ApiKeys.FETCH.latestVersion());
}
}
}

2
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java

@ -101,7 +101,7 @@ public class OffsetsRequestManagerTest { @@ -101,7 +101,7 @@ public class OffsetsRequestManagerTest {
apiVersions = mock(ApiVersions.class);
requestManager = new OffsetsRequestManager(subscriptionState, metadata,
DEFAULT_ISOLATION_LEVEL, time, RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS,
apiVersions, new LogContext());
apiVersions, mock(NetworkClientDelegate.class), new LogContext());
}
@Test

79
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java

@ -21,7 +21,9 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -21,7 +21,9 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
@ -44,6 +46,7 @@ import org.junit.jupiter.api.BeforeEach; @@ -44,6 +46,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedConstruction;
import org.mockito.stubbing.Answer;
import java.time.Duration;
import java.util.Collections;
@ -52,7 +55,10 @@ import java.util.HashSet; @@ -52,7 +55,10 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import static java.util.Collections.singleton;
@ -95,7 +101,10 @@ public class PrototypeAsyncConsumerTest { @@ -95,7 +101,10 @@ public class PrototypeAsyncConsumerTest {
this.config = new ConsumerConfig(consumerProps);
this.logContext = new LogContext();
this.subscriptions = mock(SubscriptionState.class);
this.eventHandler = mock(DefaultEventHandler.class);
final DefaultBackgroundThread bt = mock(DefaultBackgroundThread.class);
final BlockingQueue<ApplicationEvent> aq = new LinkedBlockingQueue<>();
final BlockingQueue<BackgroundEvent> bq = new LinkedBlockingQueue<>();
this.eventHandler = spy(new DefaultEventHandler(bt, aq, bq));
this.metrics = new Metrics(time);
}
@ -157,34 +166,64 @@ public class PrototypeAsyncConsumerTest { @@ -157,34 +166,64 @@ public class PrototypeAsyncConsumerTest {
@Test
public void testCommitted() {
Set<TopicPartition> mockTopicPartitions = mockTopicPartitionOffset().keySet();
Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
try (MockedConstruction<OffsetFetchApplicationEvent> mockConstruction =
mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> {
when(mock.future()).thenReturn(committedFuture);
})) {
committedFuture.complete(mockTopicPartitionOffset());
consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1000)));
committedFuture.complete(offsets);
try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
this.consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
assertDoesNotThrow(() -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000)));
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
}
}
@Test
public void testCommitted_ExceptionThrown() {
Set<TopicPartition> mockTopicPartitions = mockTopicPartitionOffset().keySet();
Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
try (MockedConstruction<OffsetFetchApplicationEvent> mockConstruction =
mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> {
when(mock.future()).thenReturn(committedFuture);
})) {
committedFuture.completeExceptionally(new KafkaException("Test exception"));
consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
assertThrows(KafkaException.class, () -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1000)));
committedFuture.completeExceptionally(new KafkaException("Test exception"));
try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
this.consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000)));
verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
}
}
/**
* This is a rather ugly bit of code. Not my choice :(
*
* <p/>
*
* Inside the {@link org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we create an
* instance of {@link OffsetFetchApplicationEvent} that holds the partitions and internally holds a
* {@link CompletableFuture}. We want to test different behaviours of the {@link Future#get()}, such as
* returning normally, timing out, throwing an error, etc. By mocking the construction of the event object that
* is created, we can affect that behavior.
*/
private static MockedConstruction<OffsetFetchApplicationEvent> offsetFetchEventMocker(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
// This "answer" is where we pass the future to be invoked by the ConsumerUtils.getResult() method
Answer<Map<TopicPartition, OffsetAndMetadata>> getInvocationAnswer = invocation -> {
// This argument captures the actual argument value that was passed to the event's get() method, so we
// just "forward" that value to our mocked call
Timer timer = invocation.getArgument(0);
return ConsumerUtils.getResult(future, timer);
};
MockedConstruction.MockInitializer<OffsetFetchApplicationEvent> mockInitializer = (mock, ctx) -> {
// When the event's get() method is invoked, we call the "answer" method just above
when(mock.get(any())).thenAnswer(getInvocationAnswer);
// When the event's type() method is invoked, we have to return the type as it will be null in the mock
when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSET);
// This is needed for the WakeupTrigger code that keeps track of the active task
when(mock.future()).thenReturn(future);
};
return mockConstruction(OffsetFetchApplicationEvent.class, mockInitializer);
}
@Test
public void testAssign() {
this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
@ -326,7 +365,7 @@ public class PrototypeAsyncConsumerTest { @@ -326,7 +365,7 @@ public class PrototypeAsyncConsumerTest {
assertTrue(wakeupTrigger.getPendingTask() == null);
}
private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>();
@ -335,7 +374,7 @@ public class PrototypeAsyncConsumerTest { @@ -335,7 +374,7 @@ public class PrototypeAsyncConsumerTest {
return topicPartitionOffsets;
}
private HashMap<TopicPartition, OffsetAndTimestamp> mockOffsetAndTimestamp() {
private Map<TopicPartition, OffsetAndTimestamp> mockOffsetAndTimestamp() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
HashMap<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = new HashMap<>();
@ -344,7 +383,7 @@ public class PrototypeAsyncConsumerTest { @@ -344,7 +383,7 @@ public class PrototypeAsyncConsumerTest {
return offsetAndTimestamp;
}
private HashMap<TopicPartition, Long> mockTimestampToSearch() {
private Map<TopicPartition, Long> mockTimestampToSearch() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

183
clients/src/test/java/org/apache/kafka/common/internals/IdempotentCloserTest.java

@ -0,0 +1,183 @@ @@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class IdempotentCloserTest {
private static final Runnable CALLBACK_NO_OP = () -> { };
private static final Runnable CALLBACK_WITH_RUNTIME_EXCEPTION = () -> {
throw new RuntimeException("Simulated error during callback");
};
/**
* Tests basic functionality, i.e. that close <em>means</em> closed.
*/
@Test
public void testBasicClose() {
IdempotentCloser ic = new IdempotentCloser();
assertFalse(ic.isClosed());
ic.close();
assertTrue(ic.isClosed());
}
/**
* Tests that the onClose callback is only invoked once.
*/
@Test
public void testCountCloses() {
AtomicInteger onCloseCounter = new AtomicInteger();
IdempotentCloser ic = new IdempotentCloser();
// Verify initial invariants.
assertFalse(ic.isClosed());
assertEquals(0, onCloseCounter.get());
// Close with our onClose callback to increment our counter.
ic.close(onCloseCounter::getAndIncrement);
assertTrue(ic.isClosed());
assertEquals(1, onCloseCounter.get());
// Close with our onClose callback again, but verify it wasn't invoked as it was previously closed.
ic.close(onCloseCounter::getAndIncrement);
assertTrue(ic.isClosed());
assertEquals(1, onCloseCounter.get());
}
/**
* Tests that the onClose callback is only invoked once, while the onPreviousClose callback can be invoked
* a variable number of times.
*/
@Test
public void testEnsureIdempotentClose() {
AtomicInteger onCloseCounter = new AtomicInteger();
AtomicInteger onPreviousCloseCounter = new AtomicInteger();
IdempotentCloser ic = new IdempotentCloser();
// Verify initial invariants.
assertFalse(ic.isClosed());
assertEquals(0, onCloseCounter.get());
assertEquals(0, onPreviousCloseCounter.get());
// Our first close passes in both callbacks. As a result, our onClose callback should be run but our
// onPreviousClose callback should not be invoked.
ic.close(onCloseCounter::getAndIncrement, onPreviousCloseCounter::getAndIncrement);
assertTrue(ic.isClosed());
assertEquals(1, onCloseCounter.get());
assertEquals(0, onPreviousCloseCounter.get());
// Our second close again passes in both callbacks. As this is the second close, our onClose callback
// should not be run but our onPreviousClose callback should be executed.
ic.close(onCloseCounter::getAndIncrement, onPreviousCloseCounter::getAndIncrement);
assertTrue(ic.isClosed());
assertEquals(1, onCloseCounter.get());
assertEquals(1, onPreviousCloseCounter.get());
// Our third close yet again passes in both callbacks. As before, our onClose callback should not be run
// but our onPreviousClose callback should be run again.
ic.close(onCloseCounter::getAndIncrement, onPreviousCloseCounter::getAndIncrement);
assertTrue(ic.isClosed());
assertEquals(1, onCloseCounter.get());
assertEquals(2, onPreviousCloseCounter.get());
}
/**
* Tests that the {@link IdempotentCloser#assertOpen(String)} method will not throw an
* exception if the closer is in the "open" state, but if invoked after it's in the "closed" state, it will
* throw the exception.
*/
@Test
public void testCloseBeforeThrows() {
IdempotentCloser ic = new IdempotentCloser();
// Verify initial invariants.
assertFalse(ic.isClosed());
// maybeThrowIllegalStateException doesn't throw anything since the closer is still in its "open" state.
assertDoesNotThrow(() -> ic.assertOpen(() -> "test"));
// Post-close, our call to maybeThrowIllegalStateException will, in fact, throw said exception.
ic.close();
assertTrue(ic.isClosed());
assertThrows(IllegalStateException.class, () -> ic.assertOpen(() -> "test"));
}
/**
* Tests that if the invoked onClose callback throws an exception, that:
*
* <ol>
* <li>The exception does not prevent the {@link IdempotentCloser} from being updated to the closed state</li>
* <li>The exception is bubbled up to the user</li>
* </ol>
*/
@Test
public void testErrorsInOnCloseCallbacksAreNotSwallowed() {
IdempotentCloser ic = new IdempotentCloser();
// Verify initial invariants.
assertFalse(ic.isClosed());
// Upon close, our onClose callback will throw an error. First ensure that it is thrown at the user.
assertThrows(RuntimeException.class, () -> ic.close(CALLBACK_WITH_RUNTIME_EXCEPTION));
// Make sure the IdempotentCloser is still closed, though.
assertTrue(ic.isClosed());
}
/**
* Tests that if the invoked onSubsequentClose callback throws an exception, that it is thrown from
* {@link IdempotentCloser#close(Runnable, Runnable)} so the user can handle it.
*/
@Test
public void testErrorsInOnPreviousCloseCallbacksAreNotSwallowed() {
IdempotentCloser ic = new IdempotentCloser();
// Verify initial invariants.
assertFalse(ic.isClosed());
// Perform the initial close. No errors here.
ic.close(CALLBACK_NO_OP);
assertTrue(ic.isClosed());
// Perform the subsequent close and verify that the exception is bubbled up to the user.
assertThrows(RuntimeException.class, () -> ic.close(CALLBACK_NO_OP, CALLBACK_WITH_RUNTIME_EXCEPTION));
assertTrue(ic.isClosed());
}
/**
* Tests that if the {@link IdempotentCloser} is created with its initial state as closed, the various APIs
* will behave as expected.
*/
@Test
public void testCreatedClosed() {
IdempotentCloser ic = new IdempotentCloser(true);
assertTrue(ic.isClosed());
assertThrows(IllegalStateException.class, () -> ic.assertOpen(() -> "test"));
assertDoesNotThrow(() -> ic.close(CALLBACK_WITH_RUNTIME_EXCEPTION));
}
}
Loading…
Cancel
Save