Browse Source

KAFKA-14937; [2/N]: Refactoring for client code to reduce boilerplate (#14218)

This PR main refactoring relates to :

1. serializers/deserializers used in clients - unified in a Deserializers class
2. logic for configuring ClusterResourceListeners moved to ClientUtils
3. misc refactoring of the new async consumer in preparation for upcoming Request Managers

Reviewers: Jun Rao <junrao@gmail.com>
pull/14171/head
Lianet Magrans 1 year ago committed by GitHub
parent
commit
e9f358eef6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
  2. 80
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  3. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
  4. 63
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
  5. 12
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
  6. 98
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
  7. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
  8. 78
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
  9. 23
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java
  10. 12
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
  11. 70
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
  12. 3
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java
  13. 49
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
  14. 44
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
  15. 19
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  16. 4
      clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  17. 3
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
  18. 44
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
  19. 8
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchConfigTest.java
  20. 6
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  21. 3
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java

18
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java

@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException; @@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
@ -246,13 +247,22 @@ public final class ClientUtils { @@ -246,13 +247,22 @@ public final class ClientUtils {
}
}
public static <T> List createConfiguredInterceptors(AbstractConfig config,
String interceptorClassesConfigName,
Class<T> clazz) {
public static <T> List configuredInterceptors(AbstractConfig config,
String interceptorClassesConfigName,
Class<T> clazz) {
String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
return config.getConfiguredInstances(
interceptorClassesConfigName,
clazz,
Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
}
}
public static ClusterResourceListeners configureClusterResourceListeners(List<?>... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
for (List<?> candidateList: candidateLists)
clusterResourceListeners.maybeAddAll(candidateList);
return clusterResourceListeners;
}
}

80
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
@ -51,12 +52,12 @@ import org.apache.kafka.common.utils.AppInfoParser; @@ -51,12 +52,12 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
@ -76,16 +77,19 @@ import java.util.regex.Pattern; @@ -76,16 +77,19 @@ import java.util.regex.Pattern;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerInterceptors;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchConfig;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createIsolationLevel;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createKeyDeserializer;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createValueDeserializer;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
import static org.apache.kafka.common.utils.Utils.isBlank;
import static org.apache.kafka.common.utils.Utils.join;
import static org.apache.kafka.common.utils.Utils.propsToMap;
import static org.apache.kafka.common.utils.Utils.swallow;
/**
* A client that consumes records from a Kafka cluster.
@ -576,8 +580,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -576,8 +580,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final String clientId;
private final Optional<String> groupId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final Deserializers<K, V> deserializers;
private final Fetcher<K, V> fetcher;
private final OffsetFetcher offsetFetcher;
private final TopicMetadataFetcher topicMetadataFetcher;
@ -649,7 +652,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -649,7 +652,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public KafkaConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
this(propsToMap(properties), keyDeserializer, valueDeserializer);
}
/**
@ -696,19 +699,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -696,19 +699,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metrics = createMetrics(config, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
List<ConsumerInterceptor<K, V>> interceptorList = createConsumerInterceptors(config);
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
this.interceptors = new ConsumerInterceptors<>(interceptorList);
this.keyDeserializer = createKeyDeserializer(config, keyDeserializer);
this.valueDeserializer = createValueDeserializer(config, valueDeserializer);
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(this.keyDeserializer,
this.valueDeserializer, metrics.reporters(), interceptorList);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
interceptorList,
Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
this.metadata.bootstrap(addresses);
FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
this.isolationLevel = createIsolationLevel(config);
this.isolationLevel = configuredIsolationLevel(config);
ApiVersions apiVersions = new ApiVersions();
this.client = createConsumerNetworkClient(config,
@ -746,7 +750,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -746,7 +750,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
}
FetchConfig<K, V> fetchConfig = createFetchConfig(config, this.keyDeserializer, this.valueDeserializer);
FetchConfig<K, V> fetchConfig = createFetchConfig(config, this.deserializers);
this.fetcher = new Fetcher<>(
logContext,
this.client,
@ -805,8 +809,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -805,8 +809,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.log = logContext.logger(getClass());
this.clientId = clientId;
this.coordinator = coordinator;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
this.fetcher = fetcher;
this.offsetFetcher = offsetFetcher;
this.topicMetadataFetcher = topicMetadataFetcher;
@ -903,13 +906,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -903,13 +906,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.unsubscribe();
} else {
for (String topic : topics) {
if (Utils.isBlank(topic))
if (isBlank(topic))
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
throwIfNoAssignorsConfigured();
fetcher.clearBufferedDataForUnassignedTopics(topics);
log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));
log.info("Subscribed to topic(s): {}", join(topics, ", "));
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
metadata.requestUpdateForNewTopics();
}
@ -1054,7 +1057,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1054,7 +1057,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
} else {
for (TopicPartition tp : partitions) {
String topic = (tp != null) ? tp.topic() : null;
if (Utils.isBlank(topic))
if (isBlank(topic))
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
}
fetcher.clearBufferedDataForUnassignedPartitions(partitions);
@ -1064,7 +1067,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1064,7 +1067,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (coordinator != null)
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
log.info("Assigned to partition(s): {}", join(partitions, ", "));
if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
metadata.requestUpdateForNewTopics();
}
@ -1859,8 +1862,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1859,8 +1862,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
"larger to relax the threshold.");
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
"larger to relax the threshold.");
} else {
offsets.forEach(this::updateLastSeenEpochIfNewer);
return offsets;
@ -2230,7 +2233,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -2230,7 +2233,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// since we would not try to poll the network client synchronously
if (lag == null) {
if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
subscriptions.requestPartitionEndOffset(topicPartition);
offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
@ -2362,16 +2365,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -2362,16 +2365,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.client.wakeup();
}
private ClusterResourceListeners configureClusterResourceListeners(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, List<?>... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
for (List<?> candidateList: candidateLists)
clusterResourceListeners.maybeAddAll(candidateList);
clusterResourceListeners.maybeAdd(keyDeserializer);
clusterResourceListeners.maybeAdd(valueDeserializer);
return clusterResourceListeners;
}
private Timer createTimerForRequest(final Duration timeout) {
// this.time could be null if an exception occurs in constructor prior to setting the this.time field
final Time localTime = (time == null) ? Time.SYSTEM : time;
@ -2388,7 +2381,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -2388,7 +2381,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// consumer.
if (coordinator != null) {
// This is a blocking call bound by the time remaining in closeTimer
Utils.swallow(log, Level.ERROR, "Failed to close coordinator with a timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer), firstException);
swallow(log, Level.ERROR, "Failed to close coordinator with a timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer), firstException);
}
if (fetcher != null) {
@ -2401,15 +2394,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -2401,15 +2394,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
closeTimer.reset(remainingDurationInTimeout);
// This is a blocking call bound by the time remaining in closeTimer
Utils.swallow(log, Level.ERROR, "Failed to close fetcher with a timeout(ms)=" + closeTimer.timeoutMs(), () -> fetcher.close(closeTimer), firstException);
swallow(log, Level.ERROR, "Failed to close fetcher with a timeout(ms)=" + closeTimer.timeoutMs(), () -> fetcher.close(closeTimer), firstException);
}
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
Utils.closeQuietly(metrics, "consumer metrics", firstException);
Utils.closeQuietly(client, "consumer network client", firstException);
Utils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
Utils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
closeQuietly(interceptors, "consumer interceptors", firstException);
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(client, "consumer network client", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer has been closed");
Throwable exception = firstException.get();
@ -2496,7 +2488,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -2496,7 +2488,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private void throwIfNoAssignorsConfigured() {
if (assignors.isEmpty())
throw new IllegalStateException("Must configure at least one partition assigner class name to " +
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}
private void maybeThrowInvalidGroupIdException() {
@ -2518,4 +2510,4 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -2518,4 +2510,4 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return updateAssignmentMetadataIfNeeded(timer, true);
}
}
}

4
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java

@ -295,9 +295,9 @@ class CompletedFetch<K, V> { @@ -295,9 +295,9 @@ class CompletedFetch<K, V> {
long timestamp = record.timestamp();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
K key = keyBytes == null ? null : fetchConfig.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
K key = keyBytes == null ? null : fetchConfig.deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
ByteBuffer valueBytes = record.value();
V value = valueBytes == null ? null : fetchConfig.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
V value = valueBytes == null ? null : fetchConfig.deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
timestamp, timestampType,
keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),

63
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java

@ -32,7 +32,6 @@ import org.apache.kafka.common.metrics.Metrics; @@ -32,7 +32,6 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -40,6 +39,7 @@ import java.util.Collections; @@ -40,6 +39,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -87,22 +87,19 @@ public final class ConsumerUtils { @@ -87,22 +87,19 @@ public final class ConsumerUtils {
}
public static LogContext createLogContext(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig) {
String groupId = String.valueOf(groupRebalanceConfig.groupId);
String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
String logPrefix;
String groupInstanceId = groupRebalanceConfig.groupInstanceId.orElse(null);
if (groupInstanceId != null) {
// If group.instance.id is set, we will append it to the log context.
logPrefix = String.format("[Consumer instanceId=%s, clientId=%s, groupId=%s] ", groupInstanceId, clientId, groupId);
Optional<String> groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
// If group.instance.id is set, we will append it to the log context.
if (groupRebalanceConfig.groupInstanceId.isPresent()) {
return new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() +
", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] ");
} else {
logPrefix = String.format("[Consumer clientId=%s, groupId=%s] ", clientId, groupId);
return new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] ");
}
return new LogContext(logPrefix);
}
public static IsolationLevel createIsolationLevel(ConsumerConfig config) {
public static IsolationLevel configuredIsolationLevel(ConsumerConfig config) {
String s = config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT);
return IsolationLevel.valueOf(s);
}
@ -134,44 +131,14 @@ public final class ConsumerUtils { @@ -134,44 +131,14 @@ public final class ConsumerUtils {
}
public static <K, V> FetchConfig<K, V> createFetchConfig(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
IsolationLevel isolationLevel = createIsolationLevel(config);
return new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel);
Deserializers<K, V> deserializers) {
IsolationLevel isolationLevel = configuredIsolationLevel(config);
return new FetchConfig<>(config, deserializers, isolationLevel);
}
@SuppressWarnings("unchecked")
public static <K, V> List<ConsumerInterceptor<K, V>> createConsumerInterceptors(ConsumerConfig config) {
return ClientUtils.createConfiguredInterceptors(config,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
}
@SuppressWarnings("unchecked")
public static <K> Deserializer<K> createKeyDeserializer(ConsumerConfig config, Deserializer<K> keyDeserializer) {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (keyDeserializer == null) {
keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
return keyDeserializer;
public static <K, V> List<ConsumerInterceptor<K, V>> configuredConsumerInterceptors(ConsumerConfig config) {
return (List<ConsumerInterceptor<K, V>>) ClientUtils.configuredInterceptors(config, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
}
@SuppressWarnings("unchecked")
public static <V> Deserializer<V> createValueDeserializer(ConsumerConfig config, Deserializer<V> valueDeserializer) {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (valueDeserializer == null) {
valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
return valueDeserializer;
}
}

12
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java

@ -57,13 +57,11 @@ public class CoordinatorRequestManager implements RequestManager { @@ -57,13 +57,11 @@ public class CoordinatorRequestManager implements RequestManager {
private long totalDisconnectedMin = 0;
private Node coordinator;
public CoordinatorRequestManager(
final Time time,
final LogContext logContext,
final long retryBackoffMs,
final ErrorEventHandler errorHandler,
final String groupId
) {
public CoordinatorRequestManager(final Time time,
final LogContext logContext,
final long retryBackoffMs,
final ErrorEventHandler errorHandler,
final String groupId) {
Objects.requireNonNull(groupId);
this.time = time;
this.log = logContext.logger(this.getClass());

98
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java

@ -34,12 +34,9 @@ import org.apache.kafka.common.utils.Time; @@ -34,12 +34,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import static java.util.Objects.requireNonNull;
@ -68,10 +65,9 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -68,10 +65,9 @@ public class DefaultBackgroundThread extends KafkaThread {
private final NetworkClientDelegate networkClientDelegate;
private final ErrorEventHandler errorEventHandler;
private final GroupState groupState;
private final SubscriptionState subscriptionState;
private boolean running;
private final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry;
private final RequestManagers requestManagers;
// Visible for testing
DefaultBackgroundThread(final Time time,
@ -79,7 +75,6 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -79,7 +75,6 @@ public class DefaultBackgroundThread extends KafkaThread {
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final SubscriptionState subscriptionState,
final ErrorEventHandler errorEventHandler,
final ApplicationEventProcessor processor,
final ConsumerMetadata metadata,
@ -99,12 +94,11 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -99,12 +94,11 @@ public class DefaultBackgroundThread extends KafkaThread {
this.networkClientDelegate = networkClient;
this.errorEventHandler = errorEventHandler;
this.groupState = groupState;
this.subscriptionState = subscriptionState;
this.requestManagerRegistry = new HashMap<>();
this.requestManagerRegistry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorManager));
this.requestManagerRegistry.put(RequestManager.Type.COMMIT, Optional.ofNullable(commitRequestManager));
this.requestManagers = new RequestManagers(Optional.ofNullable(coordinatorManager),
Optional.ofNullable(commitRequestManager));
}
public DefaultBackgroundThread(final Time time,
final ConsumerConfig config,
final GroupRebalanceConfig rebalanceConfig,
@ -129,7 +123,6 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -129,7 +123,6 @@ public class DefaultBackgroundThread extends KafkaThread {
this.log = logContext.logger(getClass());
this.applicationEventQueue = applicationEventQueue;
this.backgroundEventQueue = backgroundEventQueue;
this.subscriptionState = subscriptionState;
this.config = config;
this.metadata = metadata;
final NetworkClient networkClient = ClientUtils.createNetworkClient(config,
@ -149,33 +142,33 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -149,33 +142,33 @@ public class DefaultBackgroundThread extends KafkaThread {
this.running = true;
this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue);
this.groupState = new GroupState(rebalanceConfig);
this.requestManagerRegistry = Collections.unmodifiableMap(buildRequestManagerRegistry(logContext));
this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry, metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background processor", e.getCause());
}
}
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
private Map<RequestManager.Type, Optional<RequestManager>> buildRequestManagerRegistry(final LogContext logContext) {
Map<RequestManager.Type, Optional<RequestManager>> registry = new HashMap<>();
CoordinatorRequestManager coordinatorManager = groupState.groupId == null ?
null :
new CoordinatorRequestManager(
time,
if (groupState.groupId != null) {
CoordinatorRequestManager coordinatorManager = new CoordinatorRequestManager(
this.time,
logContext,
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
errorEventHandler,
retryBackoffMs,
this.errorEventHandler,
groupState.groupId);
CommitRequestManager commitRequestManager = coordinatorManager == null ?
null :
new CommitRequestManager(time,
logContext, this.subscriptionState, config,
CommitRequestManager commitRequestManager = new CommitRequestManager(
this.time,
logContext,
subscriptionState,
config,
coordinatorManager,
groupState);
registry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorManager));
registry.put(RequestManager.Type.COMMIT, Optional.ofNullable(commitRequestManager));
return registry;
this.requestManagers = new RequestManagers(Optional.of(coordinatorManager),
Optional.of(commitRequestManager));
} else {
this.requestManagers = new RequestManagers(Optional.empty(), Optional.empty());
}
this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagers, metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background processor", e.getCause());
}
}
@Override
@ -206,9 +199,19 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -206,9 +199,19 @@ public class DefaultBackgroundThread extends KafkaThread {
* 3. Poll the networkClient to send and retrieve the response.
*/
void runOnce() {
drain();
if (!applicationEventQueue.isEmpty()) {
LinkedList<ApplicationEvent> res = new LinkedList<>();
this.applicationEventQueue.drainTo(res);
for (ApplicationEvent event : res) {
log.debug("Consuming application event: {}", event);
Objects.requireNonNull(event);
applicationEventProcessor.process(event);
}
}
final long currentTimeMs = time.milliseconds();
final long pollWaitTimeMs = requestManagerRegistry.values().stream()
final long pollWaitTimeMs = requestManagers.entries().stream()
.filter(Optional::isPresent)
.map(m -> m.get().poll(currentTimeMs))
.map(this::handlePollResult)
@ -216,14 +219,6 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -216,14 +219,6 @@ public class DefaultBackgroundThread extends KafkaThread {
networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
}
private void drain() {
Queue<ApplicationEvent> events = pollApplicationEvent();
for (ApplicationEvent event : events) {
log.debug("Consuming application event: {}", event);
consumeApplicationEvent(event);
}
}
long handlePollResult(NetworkClientDelegate.PollResult res) {
if (!res.unsentRequests.isEmpty()) {
networkClientDelegate.addAll(res.unsentRequests);
@ -231,21 +226,6 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -231,21 +226,6 @@ public class DefaultBackgroundThread extends KafkaThread {
return res.timeUntilNextPollMs;
}
private Queue<ApplicationEvent> pollApplicationEvent() {
if (this.applicationEventQueue.isEmpty()) {
return new LinkedList<>();
}
LinkedList<ApplicationEvent> res = new LinkedList<>();
this.applicationEventQueue.drainTo(res);
return res;
}
private void consumeApplicationEvent(final ApplicationEvent event) {
requireNonNull(event);
applicationEventProcessor.process(event);
}
public boolean isRunning() {
return this.running;
}

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java

@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue; @@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* An {@code EventHandler} that uses a single background thread to consume {@code ApplicationEvent} and produce
* {@code BackgroundEvent} from the {@ConsumerBackgroundThread}.
* {@code BackgroundEvent} from the {@link DefaultBackgroundThread}.
*/
public class DefaultEventHandler implements EventHandler {
private final BlockingQueue<ApplicationEvent> applicationEventQueue;

78
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java

@ -0,0 +1,78 @@ @@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
public class Deserializers<K, V> implements AutoCloseable {
public final Deserializer<K> keyDeserializer;
public final Deserializer<V> valueDeserializer;
public Deserializers(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Key deserializer provided to Deserializers should not be null");
this.valueDeserializer = Objects.requireNonNull(valueDeserializer, "Value deserializer provided to Deserializers should not be null");
}
public Deserializers(ConsumerConfig config) {
this(config, null, null);
}
@SuppressWarnings("unchecked")
public Deserializers(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
}
@Override
public void close() {
AtomicReference<Throwable> firstException = new AtomicReference<>();
Utils.closeQuietly(keyDeserializer, "key deserializer", firstException);
Utils.closeQuietly(valueDeserializer, "value deserializer", firstException);
Throwable exception = firstException.get();
if (exception != null) {
if (exception instanceof InterruptException) {
throw (InterruptException) exception;
}
throw new KafkaException("Failed to close deserializers", exception);
}
}
}

23
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java

@ -41,8 +41,9 @@ import java.util.Objects; @@ -41,8 +41,9 @@ import java.util.Objects;
* <li>{@link #maxPollRecords}: {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG}</li>
* <li>{@link #checkCrcs}: {@link ConsumerConfig#CHECK_CRCS_CONFIG}</li>
* <li>{@link #clientRackId}: {@link ConsumerConfig#CLIENT_RACK_CONFIG}</li>
* <li>{@link #keyDeserializer}: {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG}</li>
* <li>{@link #valueDeserializer}: {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}</li>
* <li>{@link #deserializers}:
* {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG}/{@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
* </li>
* <li>{@link #isolationLevel}: {@link ConsumerConfig#ISOLATION_LEVEL_CONFIG}</li>
* </ul>
*
@ -66,8 +67,7 @@ public class FetchConfig<K, V> { @@ -66,8 +67,7 @@ public class FetchConfig<K, V> {
final int maxPollRecords;
final boolean checkCrcs;
final String clientRackId;
final Deserializer<K> keyDeserializer;
final Deserializer<V> valueDeserializer;
final Deserializers<K, V> deserializers;
final IsolationLevel isolationLevel;
public FetchConfig(int minBytes,
@ -77,8 +77,7 @@ public class FetchConfig<K, V> { @@ -77,8 +77,7 @@ public class FetchConfig<K, V> {
int maxPollRecords,
boolean checkCrcs,
String clientRackId,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Deserializers<K, V> deserializers,
IsolationLevel isolationLevel) {
this.minBytes = minBytes;
this.maxBytes = maxBytes;
@ -87,14 +86,12 @@ public class FetchConfig<K, V> { @@ -87,14 +86,12 @@ public class FetchConfig<K, V> {
this.maxPollRecords = maxPollRecords;
this.checkCrcs = checkCrcs;
this.clientRackId = clientRackId;
this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Message key deserializer provided to FetchConfig should not be null");
this.valueDeserializer = Objects.requireNonNull(valueDeserializer, "Message value deserializer provided to FetchConfig should not be null");
this.deserializers = Objects.requireNonNull(deserializers, "Message deserializers provided to FetchConfig should not be null");
this.isolationLevel = isolationLevel;
}
public FetchConfig(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Deserializers<K, V> deserializers,
IsolationLevel isolationLevel) {
this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG);
this.maxBytes = config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG);
@ -103,8 +100,7 @@ public class FetchConfig<K, V> { @@ -103,8 +100,7 @@ public class FetchConfig<K, V> {
this.maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
this.checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
this.clientRackId = config.getString(ConsumerConfig.CLIENT_RACK_CONFIG);
this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Message key deserializer provided to FetchConfig should not be null");
this.valueDeserializer = Objects.requireNonNull(valueDeserializer, "Message value deserializer provided to FetchConfig should not be null");
this.deserializers = Objects.requireNonNull(deserializers, "Message deserializers provided to FetchConfig should not be null");
this.isolationLevel = isolationLevel;
}
@ -118,8 +114,7 @@ public class FetchConfig<K, V> { @@ -118,8 +114,7 @@ public class FetchConfig<K, V> {
", maxPollRecords=" + maxPollRecords +
", checkCrcs=" + checkCrcs +
", clientRackId='" + clientRackId + '\'' +
", keyDeserializer=" + keyDeserializer +
", valueDeserializer=" + valueDeserializer +
", deserializers=" + deserializers +
", isolationLevel=" + isolationLevel +
'}';
}

12
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java

@ -202,16 +202,13 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -202,16 +202,13 @@ public class NetworkClientDelegate implements AutoCloseable {
private Optional<Node> node; // empty if random node can be chosen
private Timer timer;
public UnsentRequest(
final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node) {
public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder, final Optional<Node> node) {
this(requestBuilder, node, new FutureCompletionHandler());
}
public UnsentRequest(
final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node,
final FutureCompletionHandler handler) {
public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node,
final FutureCompletionHandler handler) {
Objects.requireNonNull(requestBuilder);
this.requestBuilder = requestBuilder;
this.node = node;
@ -246,6 +243,7 @@ public class NetworkClientDelegate implements AutoCloseable { @@ -246,6 +243,7 @@ public class NetworkClientDelegate implements AutoCloseable {
}
public static class FutureCompletionHandler implements RequestCompletionHandler {
private final CompletableFuture<ClientResponse> future;
FutureCompletionHandler() {

70
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -46,10 +47,10 @@ import org.apache.kafka.common.metrics.Metrics; @@ -46,10 +47,10 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -69,12 +70,14 @@ import java.util.regex.Pattern; @@ -69,12 +70,14 @@ import java.util.regex.Pattern;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerInterceptors;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createKeyDeserializer;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createValueDeserializer;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
import static org.apache.kafka.common.utils.Utils.isBlank;
import static org.apache.kafka.common.utils.Utils.join;
import static org.apache.kafka.common.utils.Utils.propsToMap;
/**
* This prototype consumer uses the EventHandler to process application
@ -90,8 +93,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -90,8 +93,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
private final Time time;
private final Optional<String> groupId;
private final Logger log;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final Deserializers<K, V> deserializers;
private final SubscriptionState subscriptions;
private final Metrics metrics;
private final long defaultApiTimeoutMs;
@ -99,7 +101,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -99,7 +101,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
public PrototypeAsyncConsumer(final Properties properties,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
this(propsToMap(properties), keyDeserializer, valueDeserializer);
}
public PrototypeAsyncConsumer(final Map<String, Object> configs,
@ -107,6 +109,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -107,6 +109,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
final Deserializer<V> valDeser) {
this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, valDeser)), keyDeser, valDeser);
}
public PrototypeAsyncConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
@ -117,13 +120,14 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -117,13 +120,14 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.logContext = createLogContext(config, groupRebalanceConfig);
this.log = logContext.logger(getClass());
this.keyDeserializer = createKeyDeserializer(config, keyDeserializer);
this.valueDeserializer = createValueDeserializer(config, valueDeserializer);
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
this.subscriptions = createSubscriptionState(config, logContext);
this.metrics = createMetrics(config, time);
List<ConsumerInterceptor<K, V>> interceptorList = createConsumerInterceptors(config);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(this.keyDeserializer,
this.valueDeserializer, metrics.reporters(), interceptorList);
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
interceptorList,
Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
this.eventHandler = new DefaultEventHandler(
config,
groupRebalanceConfig,
@ -137,28 +141,25 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -137,28 +141,25 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
}
// Visible for testing
PrototypeAsyncConsumer(
Time time,
LogContext logContext,
ConsumerConfig config,
SubscriptionState subscriptionState,
EventHandler eventHandler,
Metrics metrics,
Optional<String> groupId,
int defaultApiTimeoutMs) {
PrototypeAsyncConsumer(Time time,
LogContext logContext,
ConsumerConfig config,
SubscriptionState subscriptions,
EventHandler eventHandler,
Metrics metrics,
Optional<String> groupId,
int defaultApiTimeoutMs) {
this.time = time;
this.logContext = logContext;
this.log = logContext.logger(getClass());
this.subscriptions = subscriptionState;
this.subscriptions = subscriptions;
this.metrics = metrics;
this.groupId = groupId;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.keyDeserializer = createKeyDeserializer(config, null);
this.valueDeserializer = createValueDeserializer(config, null);
this.deserializers = new Deserializers<>(config);
this.eventHandler = eventHandler;
}
/**
* poll implementation using {@link EventHandler}.
* 1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
@ -436,7 +437,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -436,7 +437,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
@Override
public void close(Duration timeout) {
AtomicReference<Throwable> firstException = new AtomicReference<>();
Utils.closeQuietly(this.eventHandler, "event handler", firstException);
closeQuietly(this.eventHandler, "event handler", firstException);
log.debug("Kafka consumer has been closed");
Throwable exception = firstException.get();
if (exception != null) {
@ -522,7 +523,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -522,7 +523,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
for (TopicPartition tp : partitions) {
String topic = (tp != null) ? tp.topic() : null;
if (Utils.isBlank(topic))
if (isBlank(topic))
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
}
@ -534,7 +535,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -534,7 +535,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
// be no following rebalance
eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds()));
log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
log.info("Assigned to partition(s): {}", join(partitions, ", "));
if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
}
@ -560,19 +561,6 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -560,19 +561,6 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
throw new KafkaException("method not implemented");
}
private static <K, V> ClusterResourceListeners configureClusterResourceListeners(
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final List<?>... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
for (List<?> candidateList: candidateLists)
clusterResourceListeners.maybeAddAll(candidateList);
clusterResourceListeners.maybeAdd(keyDeserializer);
clusterResourceListeners.maybeAdd(valueDeserializer);
return clusterResourceListeners;
}
// This is here temporary as we don't have public access to the ConsumerConfig in this module.
public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,

3
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java

@ -25,7 +25,4 @@ import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollRes @@ -25,7 +25,4 @@ import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollRes
public interface RequestManager {
PollResult poll(long currentTimeMs);
enum Type {
COORDINATOR, COMMIT
}
}

49
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java

@ -0,0 +1,49 @@ @@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
* {@code RequestManagers} provides a means to pass around the set of {@link RequestManager} instances in the system.
* This allows callers to both use the specific {@link RequestManager} instance, or to iterate over the list via
* the {@link #entries()} method.
*/
public class RequestManagers {
public final Optional<CoordinatorRequestManager> coordinatorRequestManager;
public final Optional<CommitRequestManager> commitRequestManager;
private final List<Optional<? extends RequestManager>> entries;
public RequestManagers(Optional<CoordinatorRequestManager> coordinatorRequestManager,
Optional<CommitRequestManager> commitRequestManager) {
this.coordinatorRequestManager = coordinatorRequestManager;
this.commitRequestManager = commitRequestManager;
List<Optional<? extends RequestManager>> list = new ArrayList<>();
list.add(coordinatorRequestManager);
list.add(commitRequestManager);
entries = Collections.unmodifiableList(list);
}
public List<Optional<? extends RequestManager>> entries() {
return entries;
}
}

44
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java

@ -16,31 +16,28 @@ @@ -16,31 +16,28 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
public class ApplicationEventProcessor {
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final Map<RequestManager.Type, Optional<RequestManager>> registry;
private final ConsumerMetadata metadata;
private final RequestManagers requestManagers;
public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry,
final RequestManagers requestManagers,
final ConsumerMetadata metadata) {
this.backgroundEventQueue = backgroundEventQueue;
this.registry = requestManagerRegistry;
this.requestManagers = requestManagers;
this.metadata = metadata;
}
@ -75,19 +72,17 @@ public class ApplicationEventProcessor { @@ -75,19 +72,17 @@ public class ApplicationEventProcessor {
}
private boolean process(final PollApplicationEvent event) {
Optional<RequestManager> commitRequestManger = registry.get(RequestManager.Type.COMMIT);
if (!commitRequestManger.isPresent()) {
if (!requestManagers.commitRequestManager.isPresent()) {
return true;
}
CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get();
CommitRequestManager manager = requestManagers.commitRequestManager.get();
manager.updateAutoCommitTimer(event.pollTimeMs);
return true;
}
private boolean process(final CommitApplicationEvent event) {
Optional<RequestManager> commitRequestManger = registry.get(RequestManager.Type.COMMIT);
if (!commitRequestManger.isPresent()) {
if (!requestManagers.commitRequestManager.isPresent()) {
// Leaving this error handling here, but it is a bit strange as the commit API should enforce the group.id
// upfront so we should never get to this block.
Exception exception = new KafkaException("Unable to commit offset. Most likely because the group.id wasn't set");
@ -95,7 +90,7 @@ public class ApplicationEventProcessor { @@ -95,7 +90,7 @@ public class ApplicationEventProcessor {
return false;
}
CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get();
CommitRequestManager manager = requestManagers.commitRequestManager.get();
manager.addOffsetCommitRequest(event.offsets()).whenComplete((r, e) -> {
if (e != null) {
event.future().completeExceptionally(e);
@ -107,20 +102,18 @@ public class ApplicationEventProcessor { @@ -107,20 +102,18 @@ public class ApplicationEventProcessor {
}
private boolean process(final OffsetFetchApplicationEvent event) {
Optional<RequestManager> commitRequestManger = registry.get(RequestManager.Type.COMMIT);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = event.future();
if (!commitRequestManger.isPresent()) {
future.completeExceptionally(new KafkaException("Unable to fetch committed offset because the " +
"CommittedRequestManager is not available. Check if group.id was set correctly"));
if (!requestManagers.commitRequestManager.isPresent()) {
event.future().completeExceptionally(new KafkaException("Unable to fetch committed " +
"offset because the CommittedRequestManager is not available. Check if group.id was set correctly"));
return false;
}
CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get();
CommitRequestManager manager = requestManagers.commitRequestManager.get();
manager.addOffsetFetchRequest(event.partitions()).whenComplete((r, e) -> {
if (e != null) {
future.completeExceptionally(e);
event.future().completeExceptionally(e);
return;
}
future.complete(r);
event.future().complete(r);
});
return true;
}
@ -131,11 +124,10 @@ public class ApplicationEventProcessor { @@ -131,11 +124,10 @@ public class ApplicationEventProcessor {
}
private boolean process(final AssignmentChangeApplicationEvent event) {
Optional<RequestManager> commitRequestManger = registry.get(RequestManager.Type.COMMIT);
if (!commitRequestManger.isPresent()) {
if (!requestManagers.commitRequestManager.isPresent()) {
return false;
}
CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get();
CommitRequestManager manager = requestManagers.commitRequestManager.get();
manager.updateAutoCommitTimer(event.currentTimeMs);
manager.maybeAutoCommit(event.offsets);
return true;

19
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -74,6 +74,7 @@ import org.slf4j.Logger; @@ -74,6 +74,7 @@ import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -390,15 +391,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -390,15 +391,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.valueSerializer = valueSerializer;
}
List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.createConfiguredInterceptors(config,
List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.configuredInterceptors(config,
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(this.keySerializer,
this.valueSerializer, interceptorList, reporters);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
interceptorList,
reporters,
Arrays.asList(this.keySerializer, this.valueSerializer));
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
@ -1348,16 +1351,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -1348,16 +1351,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
log.debug("Kafka producer has been closed");
}
private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?>... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
for (List<?> candidateList: candidateLists)
clusterResourceListeners.maybeAddAll(candidateList);
clusterResourceListeners.maybeAdd(keySerializer);
clusterResourceListeners.maybeAdd(valueSerializer);
return clusterResourceListeners;
}
/**
* computes partition for given record.
* if the record has partition returns the value otherwise

4
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.Fetcher;
@ -2679,8 +2680,7 @@ public class KafkaConsumerTest { @@ -2679,8 +2680,7 @@ public class KafkaConsumerTest {
maxPollRecords,
checkCrcs,
CommonClientConfigs.DEFAULT_CLIENT_RACK,
keyDeserializer,
deserializer,
new Deserializers<>(keyDeserializer, deserializer),
isolationLevel);
Fetcher<String, String> fetcher = new Fetcher<>(
loggerFactory,

3
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java

@ -229,8 +229,7 @@ public class CompletedFetchTest { @@ -229,8 +229,7 @@ public class CompletedFetchTest {
ConsumerConfig.DEFAULT_MAX_POLL_RECORDS,
checkCrcs,
ConsumerConfig.DEFAULT_CLIENT_RACK,
keyDeserializer,
valueDeserializer,
new Deserializers<>(keyDeserializer, valueDeserializer),
isolationLevel
);
return new CompletedFetch<>(

44
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java

@ -41,7 +41,6 @@ import org.mockito.Mockito; @@ -41,7 +41,6 @@ import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
@ -68,10 +67,9 @@ public class DefaultBackgroundThreadTest { @@ -68,10 +67,9 @@ public class DefaultBackgroundThreadTest {
private NetworkClientDelegate networkClient;
private BlockingQueue<BackgroundEvent> backgroundEventsQueue;
private BlockingQueue<ApplicationEvent> applicationEventsQueue;
private ApplicationEventProcessor processor;
private ApplicationEventProcessor applicationEventProcessor;
private CoordinatorRequestManager coordinatorManager;
private ErrorEventHandler errorEventHandler;
private SubscriptionState subscriptionState;
private int requestTimeoutMs = 500;
private GroupState groupState;
private CommitRequestManager commitManager;
@ -84,10 +82,9 @@ public class DefaultBackgroundThreadTest { @@ -84,10 +82,9 @@ public class DefaultBackgroundThreadTest {
this.networkClient = mock(NetworkClientDelegate.class);
this.applicationEventsQueue = (BlockingQueue<ApplicationEvent>) mock(BlockingQueue.class);
this.backgroundEventsQueue = (BlockingQueue<BackgroundEvent>) mock(BlockingQueue.class);
this.processor = mock(ApplicationEventProcessor.class);
this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
this.coordinatorManager = mock(CoordinatorRequestManager.class);
this.errorEventHandler = mock(ErrorEventHandler.class);
this.subscriptionState = mock(SubscriptionState.class);
GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(
100,
100,
@ -98,9 +95,6 @@ public class DefaultBackgroundThreadTest { @@ -98,9 +95,6 @@ public class DefaultBackgroundThreadTest {
true);
this.groupState = new GroupState(rebalanceConfig);
this.commitManager = mock(CommitRequestManager.class);
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
}
@Test
@ -124,7 +118,7 @@ public class DefaultBackgroundThreadTest { @@ -124,7 +118,7 @@ public class DefaultBackgroundThreadTest {
ApplicationEvent e = new NoopApplicationEvent("noop event");
this.applicationEventsQueue.add(e);
backgroundThread.runOnce();
verify(processor, times(1)).process(e);
verify(applicationEventProcessor, times(1)).process(e);
backgroundThread.close();
}
@ -132,8 +126,10 @@ public class DefaultBackgroundThreadTest { @@ -132,8 +126,10 @@ public class DefaultBackgroundThreadTest {
public void testMetadataUpdateEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
this.processor = new ApplicationEventProcessor(this.backgroundEventsQueue, mockRequestManagerRegistry(),
metadata);
this.applicationEventProcessor = new ApplicationEventProcessor(
this.backgroundEventsQueue,
mockRequestManagers(),
metadata);
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@ -154,7 +150,7 @@ public class DefaultBackgroundThreadTest { @@ -154,7 +150,7 @@ public class DefaultBackgroundThreadTest {
ApplicationEvent e = new CommitApplicationEvent(new HashMap<>());
this.applicationEventsQueue.add(e);
backgroundThread.runOnce();
verify(processor).process(any(CommitApplicationEvent.class));
verify(applicationEventProcessor).process(any(CommitApplicationEvent.class));
backgroundThread.close();
}
@ -162,7 +158,9 @@ public class DefaultBackgroundThreadTest { @@ -162,7 +158,9 @@ public class DefaultBackgroundThreadTest {
public void testAssignmentChangeEvent() {
this.applicationEventsQueue = new LinkedBlockingQueue<>();
this.backgroundEventsQueue = new LinkedBlockingQueue<>();
this.processor = spy(new ApplicationEventProcessor(this.backgroundEventsQueue, mockRequestManagerRegistry(),
this.applicationEventProcessor = spy(new ApplicationEventProcessor(
this.backgroundEventsQueue,
mockRequestManagers(),
metadata));
DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@ -176,7 +174,7 @@ public class DefaultBackgroundThreadTest { @@ -176,7 +174,7 @@ public class DefaultBackgroundThreadTest {
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
backgroundThread.runOnce();
verify(processor).process(any(AssignmentChangeApplicationEvent.class));
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class));
verify(networkClient, times(1)).poll(anyLong(), anyLong());
verify(commitManager, times(1)).updateAutoCommitTimer(currentTimeMs);
verify(commitManager, times(1)).maybeAutoCommit(offset);
@ -219,11 +217,8 @@ public class DefaultBackgroundThreadTest { @@ -219,11 +217,8 @@ public class DefaultBackgroundThreadTest {
return topicPartitionOffsets;
}
private Map<RequestManager.Type, Optional<RequestManager>> mockRequestManagerRegistry() {
Map<RequestManager.Type, Optional<RequestManager>> registry = new HashMap<>();
registry.put(RequestManager.Type.COORDINATOR, Optional.of(coordinatorManager));
registry.put(RequestManager.Type.COMMIT, Optional.of(commitManager));
return registry;
private RequestManagers mockRequestManagers() {
return new RequestManagers(Optional.of(coordinatorManager), Optional.of(commitManager));
}
private static NetworkClientDelegate.UnsentRequest findCoordinatorUnsentRequest(
@ -235,21 +230,24 @@ public class DefaultBackgroundThreadTest { @@ -235,21 +230,24 @@ public class DefaultBackgroundThreadTest {
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
.setKey("foobar")),
Optional.empty());
Optional.empty());
req.setTimer(time, timeout);
return req;
}
private DefaultBackgroundThread mockBackgroundThread() {
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
return new DefaultBackgroundThread(
this.time,
new ConsumerConfig(properties),
new LogContext(),
applicationEventsQueue,
backgroundEventsQueue,
subscriptionState,
this.errorEventHandler,
processor,
applicationEventProcessor,
this.metadata,
this.networkClient,
this.groupState,
@ -268,4 +266,4 @@ public class DefaultBackgroundThreadTest { @@ -268,4 +266,4 @@ public class DefaultBackgroundThreadTest {
RETRY_BACKOFF_MS,
Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs)));
}
}
}

8
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchConfigTest.java

@ -73,7 +73,10 @@ public class FetchConfigTest { @@ -73,7 +73,10 @@ public class FetchConfigTest {
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
ConsumerConfig config = new ConsumerConfig(p);
new FetchConfig<>(config, keyDeserializer, valueDeserializer, IsolationLevel.READ_UNCOMMITTED);
new FetchConfig<>(
config,
new Deserializers<>(keyDeserializer, valueDeserializer),
IsolationLevel.READ_UNCOMMITTED);
}
private void newFetchConfigFromValues(Deserializer<String> keyDeserializer,
@ -85,8 +88,7 @@ public class FetchConfigTest { @@ -85,8 +88,7 @@ public class FetchConfigTest {
ConsumerConfig.DEFAULT_MAX_POLL_RECORDS,
true,
ConsumerConfig.DEFAULT_CLIENT_RACK,
keyDeserializer,
valueDeserializer,
new Deserializers<>(keyDeserializer, valueDeserializer),
IsolationLevel.READ_UNCOMMITTED);
}
}

6
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -2848,8 +2848,7 @@ public class FetcherTest { @@ -2848,8 +2848,7 @@ public class FetcherTest {
2 * numPartitions,
true, // check crcs
CommonClientConfigs.DEFAULT_CLIENT_RACK,
new ByteArrayDeserializer(),
new ByteArrayDeserializer(),
new Deserializers<>(new ByteArrayDeserializer(), new ByteArrayDeserializer()),
isolationLevel);
fetcher = new Fetcher<byte[], byte[]>(
logContext,
@ -3651,8 +3650,7 @@ public class FetcherTest { @@ -3651,8 +3650,7 @@ public class FetcherTest {
maxPollRecords,
true, // check crc
CommonClientConfigs.DEFAULT_CLIENT_RACK,
keyDeserializer,
valueDeserializer,
new Deserializers<>(keyDeserializer, valueDeserializer),
isolationLevel);
fetcher = spy(new Fetcher<>(
logContext,

3
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java

@ -1254,8 +1254,7 @@ public class OffsetFetcherTest { @@ -1254,8 +1254,7 @@ public class OffsetFetcherTest {
maxPollRecords,
true, // check crc
CommonClientConfigs.DEFAULT_CLIENT_RACK,
new ByteArrayDeserializer(),
new ByteArrayDeserializer(),
new Deserializers<>(new ByteArrayDeserializer(), new ByteArrayDeserializer()),
isolationLevel);
Fetcher<byte[], byte[]> fetcher = new Fetcher<>(
logContext,

Loading…
Cancel
Save