Browse Source

KAFKA-14274 #1: basic refactoring (#14305)

This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.

Reviewers: Christo Lolov <lolovc@amazon.com>, Jun Rao <junrao@gmail.com>
pull/14358/head
Kirk True 1 year ago committed by GitHub
parent
commit
a2de7d32c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
  2. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  3. 22
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
  4. 26
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
  5. 32
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
  6. 7
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  7. 12
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
  8. 68
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java
  9. 10
      clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
  10. 3
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java

18
clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.utils.Time;
@ -114,4 +115,21 @@ public final class NetworkClientUtils { @@ -114,4 +115,21 @@ public final class NetworkClientUtils {
}
}
/**
* Check if the code is disconnected and unavailable for immediate reconnection (i.e. if it is in
* reconnect backoff window following the disconnect).
*/
public static boolean isUnavailable(KafkaClient client, Node node, Time time) {
return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0;
}
/**
* Check for an authentication error on a given node and raise the exception if there is one.
*/
public static void maybeThrowAuthFailure(KafkaClient client, Node node) {
AuthenticationException exception = client.authenticationException(node);
if (exception != null)
throw exception;
}
}

6
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -639,9 +639,9 @@ public class ConsumerConfig extends AbstractConfig { @@ -639,9 +639,9 @@ public class ConsumerConfig extends AbstractConfig {
}
}
protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
// validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value
Map<String, Object> newConfigs = new HashMap<>(configs);
if (keyDeserializer != null)

22
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java

@ -274,11 +274,11 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -274,11 +274,11 @@ public abstract class AbstractFetch<K, V> implements Closeable {
try {
while (recordsRemaining > 0) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
CompletedFetch<K, V> records = completedFetches.peek();
if (records == null) break;
if (!records.initialized) {
if (!records.isInitialized()) {
try {
nextInLineFetch = initializeCompletedFetch(records);
} catch (Exception e) {
@ -336,7 +336,7 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -336,7 +336,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
throw new IllegalStateException("Missing position for fetchable partition " + nextInLineFetch.partition);
}
if (nextInLineFetch.nextFetchOffset == position.offset) {
if (nextInLineFetch.nextFetchOffset() == position.offset) {
List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(maxRecords);
log.trace("Returning {} fetched records at offset {} for assigned partition {}",
@ -344,10 +344,10 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -344,10 +344,10 @@ public abstract class AbstractFetch<K, V> implements Closeable {
boolean positionAdvanced = false;
if (nextInLineFetch.nextFetchOffset > position.offset) {
if (nextInLineFetch.nextFetchOffset() > position.offset) {
SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
nextInLineFetch.nextFetchOffset,
nextInLineFetch.lastEpoch,
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(),
position.currentLeader);
log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
position, nextPosition, nextInLineFetch.partition, partRecords.size());
@ -369,7 +369,7 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -369,7 +369,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
nextInLineFetch.partition, nextInLineFetch.nextFetchOffset, position);
nextInLineFetch.partition, nextInLineFetch.nextFetchOffset(), position);
}
}
@ -381,7 +381,7 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -381,7 +381,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
private List<TopicPartition> fetchablePartitions() {
Set<TopicPartition> exclude = new HashSet<>();
if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
if (nextInLineFetch != null && !nextInLineFetch.isConsumed()) {
exclude.add(nextInLineFetch.partition);
}
for (CompletedFetch<K, V> completedFetch : completedFetches) {
@ -528,7 +528,7 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -528,7 +528,7 @@ public abstract class AbstractFetch<K, V> implements Closeable {
private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(final CompletedFetch<K, V> completedFetch) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset;
final long fetchOffset = completedFetch.nextFetchOffset();
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
@ -586,14 +586,14 @@ public abstract class AbstractFetch<K, V> implements Closeable { @@ -586,14 +586,14 @@ public abstract class AbstractFetch<K, V> implements Closeable {
});
}
completedFetch.initialized = true;
completedFetch.setInitialized();
return completedFetch;
}
private void handleInitializeCompletedFetchErrors(final CompletedFetch<K, V> completedFetch,
final Errors error) {
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset;
final long fetchOffset = completedFetch.nextFetchOffset();
if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||

26
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

@ -53,7 +53,8 @@ public class CommitRequestManager implements RequestManager { @@ -53,7 +53,8 @@ public class CommitRequestManager implements RequestManager {
// TODO: current in ConsumerConfig but inaccessible in the internal package.
private static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
// TODO: We will need to refactor the subscriptionState
private final SubscriptionState subscriptionState;
private final SubscriptionState subscriptions;
private final LogContext logContext;
private final Logger log;
private final Optional<AutoCommitState> autoCommitState;
private final CoordinatorRequestManager coordinatorRequestManager;
@ -66,11 +67,12 @@ public class CommitRequestManager implements RequestManager { @@ -66,11 +67,12 @@ public class CommitRequestManager implements RequestManager {
public CommitRequestManager(
final Time time,
final LogContext logContext,
final SubscriptionState subscriptionState,
final SubscriptionState subscriptions,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final GroupState groupState) {
Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
this.logContext = logContext;
this.log = logContext.logger(getClass());
this.pendingRequests = new PendingRequests();
if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
@ -82,7 +84,7 @@ public class CommitRequestManager implements RequestManager { @@ -82,7 +84,7 @@ public class CommitRequestManager implements RequestManager {
}
this.coordinatorRequestManager = coordinatorRequestManager;
this.groupState = groupState;
this.subscriptionState = subscriptionState;
this.subscriptions = subscriptions;
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.throwOnFetchStableOffsetUnsupported = config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
@ -99,7 +101,7 @@ public class CommitRequestManager implements RequestManager { @@ -99,7 +101,7 @@ public class CommitRequestManager implements RequestManager {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
maybeAutoCommit(this.subscriptionState.allConsumed());
maybeAutoCommit(this.subscriptions.allConsumed());
if (!pendingRequests.hasUnsentRequests()) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
@ -167,9 +169,9 @@ public class CommitRequestManager implements RequestManager { @@ -167,9 +169,9 @@ public class CommitRequestManager implements RequestManager {
})
.exceptionally(t -> {
if (t instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, t);
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, t.getMessage());
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, t.getMessage());
log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, t);
}
return null;
});
@ -241,7 +243,7 @@ public class CommitRequestManager implements RequestManager { @@ -241,7 +243,7 @@ public class CommitRequestManager implements RequestManager {
final GroupState.Generation generation,
final long retryBackoffMs,
final long retryBackoffMaxMs) {
super(retryBackoffMs, retryBackoffMaxMs);
super(logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, retryBackoffMaxMs);
this.requestedPartitions = partitions;
this.requestedGeneration = generation;
this.future = new CompletableFuture<>();
@ -366,6 +368,16 @@ public class CommitRequestManager implements RequestManager { @@ -366,6 +368,16 @@ public class CommitRequestManager implements RequestManager {
}
});
}
@Override
public String toString() {
return "OffsetFetchRequestState{" +
"requestedPartitions=" + requestedPartitions +
", requestedGeneration=" + requestedGeneration +
", future=" + future +
", " + toStringBase() +
'}';
}
}
/**

32
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java

@ -57,17 +57,12 @@ import java.util.Set; @@ -57,17 +57,12 @@ import java.util.Set;
* @param <K> Record key type
* @param <V> Record value type
*/
class CompletedFetch<K, V> {
public class CompletedFetch<K, V> {
final TopicPartition partition;
final FetchResponseData.PartitionData partitionData;
final short requestVersion;
long nextFetchOffset;
Optional<Integer> lastEpoch;
boolean isConsumed = false;
boolean initialized = false;
private final Logger log;
private final SubscriptionState subscriptions;
private final FetchConfig<K, V> fetchConfig;
@ -84,6 +79,10 @@ class CompletedFetch<K, V> { @@ -84,6 +79,10 @@ class CompletedFetch<K, V> {
private CloseableIterator<Record> records;
private Exception cachedRecordException = null;
private boolean corruptLastRecord = false;
private long nextFetchOffset;
private Optional<Integer> lastEpoch;
private boolean isConsumed = false;
private boolean initialized = false;
CompletedFetch(LogContext logContext,
SubscriptionState subscriptions,
@ -109,6 +108,27 @@ class CompletedFetch<K, V> { @@ -109,6 +108,27 @@ class CompletedFetch<K, V> {
this.abortedTransactions = abortedTransactions(partitionData);
}
long nextFetchOffset() {
return nextFetchOffset;
}
Optional<Integer> lastEpoch() {
return lastEpoch;
}
boolean isInitialized() {
return initialized;
}
void setInitialized() {
this.initialized = true;
}
public boolean isConsumed() {
return isConsumed;
}
/**
* After each partition is parsed, we update the current metric totals with the total bytes
* and number of records parsed. After all partitions have reported, we write the metric.

7
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java

@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientRequest; @@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
@ -558,7 +559,7 @@ public class ConsumerNetworkClient implements Closeable { @@ -558,7 +559,7 @@ public class ConsumerNetworkClient implements Closeable {
public boolean isUnavailable(Node node) {
lock.lock();
try {
return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0;
return NetworkClientUtils.isUnavailable(client, node, time);
} finally {
lock.unlock();
}
@ -570,9 +571,7 @@ public class ConsumerNetworkClient implements Closeable { @@ -570,9 +571,7 @@ public class ConsumerNetworkClient implements Closeable {
public void maybeThrowAuthFailure(Node node) {
lock.lock();
try {
AuthenticationException exception = client.authenticationException(node);
if (exception != null)
throw exception;
NetworkClientUtils.maybeThrowAuthFailure(client, node);
} finally {
lock.unlock();
}

12
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java

@ -34,13 +34,13 @@ import java.util.Optional; @@ -34,13 +34,13 @@ import java.util.Optional;
/**
* This is responsible for timing to send the next {@link FindCoordinatorRequest} based on the following criteria:
*
* <p/>
* Whether there is an existing coordinator.
* Whether there is an inflight request.
* Whether the backoff timer has expired.
* The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer
* or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}.
*
* <p/>
* The {@link FindCoordinatorRequest} will be handled by the {@link #onResponse(long, FindCoordinatorResponse)} callback, which
* subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be
* marked {@code null} upon receiving a failure.
@ -70,7 +70,12 @@ public class CoordinatorRequestManager implements RequestManager { @@ -70,7 +70,12 @@ public class CoordinatorRequestManager implements RequestManager {
this.log = logContext.logger(this.getClass());
this.nonRetriableErrorHandler = errorHandler;
this.groupId = groupId;
this.coordinatorRequestState = new RequestState(retryBackoffMs, retryBackoffMaxMs);
this.coordinatorRequestState = new RequestState(
logContext,
CoordinatorRequestManager.class.getSimpleName(),
retryBackoffMs,
retryBackoffMaxMs
);
}
/**
@ -218,5 +223,4 @@ public class CoordinatorRequestManager implements RequestManager { @@ -218,5 +223,4 @@ public class CoordinatorRequestManager implements RequestManager {
public Optional<Node> coordinator() {
return Optional.ofNullable(this.coordinator);
}
}

68
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java

@ -17,18 +17,27 @@ @@ -17,18 +17,27 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
class RequestState {
private final Logger log;
protected final String owner;
final static int RETRY_BACKOFF_EXP_BASE = 2;
final static double RETRY_BACKOFF_JITTER = 0.2;
private final ExponentialBackoff exponentialBackoff;
private long lastSentMs = -1;
private long lastReceivedMs = -1;
private int numAttempts = 0;
private long backoffMs = 0;
protected final ExponentialBackoff exponentialBackoff;
protected long lastSentMs = -1;
protected long lastReceivedMs = -1;
protected int numAttempts = 0;
protected long backoffMs = 0;
public RequestState(final long retryBackoffMs,
public RequestState(final LogContext logContext,
final String owner,
final long retryBackoffMs,
final long retryBackoffMaxMs) {
this.log = logContext.logger(RequestState.class);
this.owner = owner;
this.exponentialBackoff = new ExponentialBackoff(
retryBackoffMs,
RETRY_BACKOFF_EXP_BASE,
@ -37,10 +46,14 @@ class RequestState { @@ -37,10 +46,14 @@ class RequestState {
}
// Visible for testing
RequestState(final long retryBackoffMs,
RequestState(final LogContext logContext,
final String owner,
final long retryBackoffMs,
final int retryBackoffExpBase,
final long retryBackoffMaxMs,
final double jitter) {
this.log = logContext.logger(RequestState.class);
this.owner = owner;
this.exponentialBackoff = new ExponentialBackoff(
retryBackoffMs,
retryBackoffExpBase,
@ -65,13 +78,19 @@ class RequestState { @@ -65,13 +78,19 @@ class RequestState {
return true;
}
if (this.lastReceivedMs == -1 ||
this.lastReceivedMs < this.lastSentMs) {
// there is an inflight request
if (this.lastReceivedMs == -1 || this.lastReceivedMs < this.lastSentMs) {
log.trace("An inflight request already exists for {}", this);
return false;
}
return requestBackoffExpired(currentTimeMs);
long remainingBackoffMs = remainingBackoffMs(currentTimeMs);
if (remainingBackoffMs <= 0) {
return true;
} else {
log.trace("{} ms remain before another request should be sent for {}", remainingBackoffMs, this);
return false;
}
}
public void onSendAttempt(final long currentTimeMs) {
@ -105,12 +124,29 @@ class RequestState { @@ -105,12 +124,29 @@ class RequestState {
this.numAttempts++;
}
private boolean requestBackoffExpired(final long currentTimeMs) {
return remainingBackoffMs(currentTimeMs) <= 0;
}
long remainingBackoffMs(final long currentTimeMs) {
long timeSinceLastReceiveMs = currentTimeMs - this.lastReceivedMs;
return Math.max(0, backoffMs - timeSinceLastReceiveMs);
}
}
/**
* This method appends the instance variables together in a simple String of comma-separated key value pairs.
* This allows subclasses to include these values and not have to duplicate each variable, helping to prevent
* any variables from being omitted when new ones are added.
*
* @return String version of instance variables.
*/
protected String toStringBase() {
return "owner='" + owner + '\'' +
", exponentialBackoff=" + exponentialBackoff +
", lastSentMs=" + lastSentMs +
", lastReceivedMs=" + lastReceivedMs +
", numAttempts=" + numAttempts +
", backoffMs=" + backoffMs;
}
@Override
public String toString() {
return "RequestState{" + toStringBase() + '}';
}
}

10
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java

@ -58,4 +58,14 @@ public class ExponentialBackoff { @@ -58,4 +58,14 @@ public class ExponentialBackoff {
long backoffValue = (long) (randomFactor * term);
return backoffValue > maxInterval ? maxInterval : backoffValue;
}
@Override
public String toString() {
return "ExponentialBackoff{" +
"multiplier=" + multiplier +
", expMax=" + expMax +
", initialInterval=" + initialInterval +
", jitter=" + jitter +
'}';
}
}

3
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -25,6 +26,8 @@ public class RequestStateTest { @@ -25,6 +26,8 @@ public class RequestStateTest {
@Test
public void testRequestStateSimple() {
RequestState state = new RequestState(
new LogContext(),
this.getClass().getSimpleName(),
100,
2,
1000,

Loading…
Cancel
Save