Browse Source
Changes: 1. Introduces FetchRequestManager that implements the RequestManager API for fetching messages from brokers. Unlike Fetcher, record decompression and deserialization is performed on the application thread inside CompletedFetch. 2. Restructured the code so that objects owned by the background thread are not instantiated until the background thread runs (via Supplier) to ensure that there are no references available to the application thread. 3. Ensuring resources are properly using Closeable and using IdempotentCloser to ensure they're only closed once. 4. Introduces ConsumerTestBuilder to reduce a lot of inconsistency in the way the objects were built up for tests. Reviewers: Philip Nee <pnee@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Jun Rao<junrao@gmail.com>pull/13451/merge
Kirk True
11 months ago
committed by
GitHub
59 changed files with 7023 additions and 2452 deletions
@ -0,0 +1,268 @@
@@ -0,0 +1,268 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.KafkaClient; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; |
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; |
||||
import org.apache.kafka.common.KafkaException; |
||||
import org.apache.kafka.common.errors.WakeupException; |
||||
import org.apache.kafka.common.internals.IdempotentCloser; |
||||
import org.apache.kafka.common.requests.AbstractRequest; |
||||
import org.apache.kafka.common.utils.KafkaThread; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.common.utils.Timer; |
||||
import org.slf4j.Logger; |
||||
|
||||
import java.io.Closeable; |
||||
import java.time.Duration; |
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
import java.util.Objects; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.Future; |
||||
import java.util.function.Supplier; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; |
||||
import static org.apache.kafka.common.utils.Utils.closeQuietly; |
||||
|
||||
/** |
||||
* Background thread runnable that consumes {@link ApplicationEvent} and produces {@link BackgroundEvent}. It |
||||
* uses an event loop to consume and produce events, and poll the network client to handle network IO. |
||||
*/ |
||||
public class ConsumerNetworkThread extends KafkaThread implements Closeable { |
||||
|
||||
private static final long MAX_POLL_TIMEOUT_MS = 5000; |
||||
private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread"; |
||||
private final Time time; |
||||
private final Logger log; |
||||
private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier; |
||||
private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier; |
||||
private final Supplier<RequestManagers> requestManagersSupplier; |
||||
private ApplicationEventProcessor applicationEventProcessor; |
||||
private NetworkClientDelegate networkClientDelegate; |
||||
private RequestManagers requestManagers; |
||||
private volatile boolean running; |
||||
private final IdempotentCloser closer = new IdempotentCloser(); |
||||
private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); |
||||
|
||||
public ConsumerNetworkThread(LogContext logContext, |
||||
Time time, |
||||
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier, |
||||
Supplier<NetworkClientDelegate> networkClientDelegateSupplier, |
||||
Supplier<RequestManagers> requestManagersSupplier) { |
||||
super(BACKGROUND_THREAD_NAME, true); |
||||
this.time = time; |
||||
this.log = logContext.logger(getClass()); |
||||
this.applicationEventProcessorSupplier = applicationEventProcessorSupplier; |
||||
this.networkClientDelegateSupplier = networkClientDelegateSupplier; |
||||
this.requestManagersSupplier = requestManagersSupplier; |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
closer.assertOpen("Consumer network thread is already closed"); |
||||
running = true; |
||||
|
||||
try { |
||||
log.debug("Consumer network thread started"); |
||||
|
||||
// Wait until we're securely in the background network thread to initialize these objects...
|
||||
initializeResources(); |
||||
|
||||
while (running) { |
||||
try { |
||||
runOnce(); |
||||
} catch (final WakeupException e) { |
||||
log.debug("WakeupException caught, consumer network thread won't be interrupted"); |
||||
// swallow the wakeup exception to prevent killing the thread.
|
||||
} |
||||
} |
||||
} catch (final Throwable t) { |
||||
log.error("The consumer network thread failed due to unexpected error", t); |
||||
throw new KafkaException(t); |
||||
} finally { |
||||
cleanup(); |
||||
} |
||||
} |
||||
|
||||
void initializeResources() { |
||||
applicationEventProcessor = applicationEventProcessorSupplier.get(); |
||||
networkClientDelegate = networkClientDelegateSupplier.get(); |
||||
requestManagers = requestManagersSupplier.get(); |
||||
} |
||||
|
||||
/** |
||||
* Poll and process the {@link ApplicationEvent application events}. It performs the following tasks: |
||||
* |
||||
* <ol> |
||||
* <li> |
||||
* Drains and processes all the events from the application thread's application event queue via |
||||
* {@link ApplicationEventProcessor} |
||||
* </li> |
||||
* <li> |
||||
* Iterate through the {@link RequestManager} list and invoke {@link RequestManager#poll(long)} to get |
||||
* the {@link NetworkClientDelegate.UnsentRequest} list and the poll time for the network poll |
||||
* </li> |
||||
* <li> |
||||
* Stage each {@link AbstractRequest.Builder request} to be sent via |
||||
* {@link NetworkClientDelegate#addAll(List)} |
||||
* </li> |
||||
* <li> |
||||
* Poll the client via {@link KafkaClient#poll(long, long)} to send the requests, as well as |
||||
* retrieve any available responses |
||||
* </li> |
||||
* </ol> |
||||
*/ |
||||
void runOnce() { |
||||
// If there are errors processing any events, the error will be thrown immediately. This will have
|
||||
// the effect of closing the background thread.
|
||||
applicationEventProcessor.process(); |
||||
|
||||
final long currentTimeMs = time.milliseconds(); |
||||
final long pollWaitTimeMs = requestManagers.entries().stream() |
||||
.filter(Optional::isPresent) |
||||
.map(Optional::get) |
||||
.map(rm -> rm.poll(currentTimeMs)) |
||||
.map(networkClientDelegate::addAll) |
||||
.reduce(MAX_POLL_TIMEOUT_MS, Math::min); |
||||
networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); |
||||
} |
||||
|
||||
/** |
||||
* Performs any network I/O that is needed at the time of close for the consumer: |
||||
* |
||||
* <ol> |
||||
* <li> |
||||
* Iterate through the {@link RequestManager} list and invoke {@link RequestManager#pollOnClose()} |
||||
* to get the {@link NetworkClientDelegate.UnsentRequest} list and the poll time for the network poll |
||||
* </li> |
||||
* <li> |
||||
* Stage each {@link AbstractRequest.Builder request} to be sent via |
||||
* {@link NetworkClientDelegate#addAll(List)} |
||||
* </li> |
||||
* <li> |
||||
* {@link KafkaClient#poll(long, long) Poll the client} to send the requests, as well as |
||||
* retrieve any available responses |
||||
* </li> |
||||
* <li> |
||||
* Continuously {@link KafkaClient#poll(long, long) poll the client} as long as the |
||||
* {@link Timer#notExpired() timer hasn't expired} to retrieve the responses |
||||
* </li> |
||||
* </ol> |
||||
*/ |
||||
// Visible for testing
|
||||
static void runAtClose(final Collection<Optional<? extends RequestManager>> requestManagers, |
||||
final NetworkClientDelegate networkClientDelegate, |
||||
final Timer timer) { |
||||
// These are the optional outgoing requests at the
|
||||
List<NetworkClientDelegate.PollResult> pollResults = requestManagers.stream() |
||||
.filter(Optional::isPresent) |
||||
.map(Optional::get) |
||||
.map(RequestManager::pollOnClose) |
||||
.collect(Collectors.toList()); |
||||
long pollWaitTimeMs = pollResults.stream() |
||||
.map(networkClientDelegate::addAll) |
||||
.reduce(MAX_POLL_TIMEOUT_MS, Math::min); |
||||
pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs()); |
||||
networkClientDelegate.poll(pollWaitTimeMs, timer.currentTimeMs()); |
||||
timer.update(); |
||||
|
||||
List<Future<?>> requestFutures = pollResults.stream() |
||||
.flatMap(fads -> fads.unsentRequests.stream()) |
||||
.map(NetworkClientDelegate.UnsentRequest::future) |
||||
.collect(Collectors.toList()); |
||||
|
||||
// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until
|
||||
// all requests have received a response.
|
||||
while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { |
||||
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); |
||||
timer.update(); |
||||
} |
||||
} |
||||
|
||||
public boolean isRunning() { |
||||
return running; |
||||
} |
||||
|
||||
public void wakeup() { |
||||
// The network client can be null if the initializeResources method has not yet been called.
|
||||
if (networkClientDelegate != null) |
||||
networkClientDelegate.wakeup(); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
close(closeTimeout); |
||||
} |
||||
|
||||
public void close(final Duration timeout) { |
||||
Objects.requireNonNull(timeout, "Close timeout for consumer network thread must be non-null"); |
||||
|
||||
closer.close( |
||||
() -> closeInternal(timeout), |
||||
() -> log.warn("The consumer network thread was already closed") |
||||
); |
||||
} |
||||
|
||||
/** |
||||
* Starts the closing process. |
||||
* |
||||
* <p/> |
||||
* |
||||
* This method is called from the application thread, but our resources are owned by the network thread. As such, |
||||
* we don't actually close any of those resources here, immediately, on the application thread. Instead, we just |
||||
* update our internal state on the application thread. When the network thread next |
||||
* {@link #run() executes its loop}, it will notice that state, cease processing any further events, and begin |
||||
* {@link #cleanup() closing its resources}. |
||||
* |
||||
* <p/> |
||||
* |
||||
* This method will wait (i.e. block the application thread) for up to the duration of the given timeout to give |
||||
* the network thread the time to close down cleanly. |
||||
* |
||||
* @param timeout Upper bound of time to wait for the network thread to close its resources |
||||
*/ |
||||
private void closeInternal(final Duration timeout) { |
||||
long timeoutMs = timeout.toMillis(); |
||||
log.trace("Signaling the consumer network thread to close in {}ms", timeoutMs); |
||||
running = false; |
||||
closeTimeout = timeout; |
||||
wakeup(); |
||||
|
||||
if (timeoutMs > 0) { |
||||
try { |
||||
join(timeoutMs); |
||||
} catch (InterruptedException e) { |
||||
log.error("Interrupted while waiting for consumer network thread to complete", e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void cleanup() { |
||||
log.trace("Closing the consumer network thread"); |
||||
Timer timer = time.timer(closeTimeout); |
||||
runAtClose(requestManagers.entries(), networkClientDelegate, timer); |
||||
closeQuietly(requestManagers, "request managers"); |
||||
closeQuietly(networkClientDelegate, "network client delegate"); |
||||
closeQuietly(applicationEventProcessor, "application event processor"); |
||||
log.debug("Closed the consumer network thread"); |
||||
} |
||||
} |
@ -1,293 +0,0 @@
@@ -1,293 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.ApiVersions; |
||||
import org.apache.kafka.clients.ClientUtils; |
||||
import org.apache.kafka.clients.GroupRebalanceConfig; |
||||
import org.apache.kafka.clients.NetworkClient; |
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; |
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; |
||||
import org.apache.kafka.common.KafkaException; |
||||
import org.apache.kafka.common.errors.WakeupException; |
||||
import org.apache.kafka.common.metrics.Metrics; |
||||
import org.apache.kafka.common.metrics.Sensor; |
||||
import org.apache.kafka.common.utils.KafkaThread; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.slf4j.Logger; |
||||
|
||||
import java.util.LinkedList; |
||||
import java.util.Objects; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.BlockingQueue; |
||||
|
||||
import static java.util.Objects.requireNonNull; |
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION; |
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; |
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel; |
||||
|
||||
/** |
||||
* Background thread runnable that consumes {@code ApplicationEvent} and |
||||
* produces {@code BackgroundEvent}. It uses an event loop to consume and |
||||
* produce events, and poll the network client to handle network IO. |
||||
* <p/> |
||||
* It holds a reference to the {@link SubscriptionState}, which is |
||||
* initialized by the polling thread. |
||||
* <p/> |
||||
* For processing application events that have been submitted to the |
||||
* {@link #applicationEventQueue}, this relies on an {@link ApplicationEventProcessor}. Processing includes generating requests and |
||||
* handling responses with the appropriate {@link RequestManager}. The network operations for |
||||
* actually sending the requests is delegated to the {@link NetworkClientDelegate} |
||||
* </li> |
||||
*/ |
||||
public class DefaultBackgroundThread extends KafkaThread { |
||||
private static final long MAX_POLL_TIMEOUT_MS = 5000; |
||||
private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread"; |
||||
private final Time time; |
||||
private final Logger log; |
||||
private final BlockingQueue<ApplicationEvent> applicationEventQueue; |
||||
private final BlockingQueue<BackgroundEvent> backgroundEventQueue; |
||||
private final ConsumerMetadata metadata; |
||||
private final ConsumerConfig config; |
||||
// empty if groupId is null
|
||||
private final ApplicationEventProcessor applicationEventProcessor; |
||||
private final NetworkClientDelegate networkClientDelegate; |
||||
private final ErrorEventHandler errorEventHandler; |
||||
private final GroupState groupState; |
||||
private boolean running; |
||||
|
||||
private final RequestManagers requestManagers; |
||||
|
||||
// Visible for testing
|
||||
@SuppressWarnings("ParameterNumber") |
||||
DefaultBackgroundThread(final Time time, |
||||
final ConsumerConfig config, |
||||
final LogContext logContext, |
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue, |
||||
final BlockingQueue<BackgroundEvent> backgroundEventQueue, |
||||
final ErrorEventHandler errorEventHandler, |
||||
final ApplicationEventProcessor processor, |
||||
final ConsumerMetadata metadata, |
||||
final NetworkClientDelegate networkClient, |
||||
final GroupState groupState, |
||||
final CoordinatorRequestManager coordinatorManager, |
||||
final CommitRequestManager commitRequestManager, |
||||
final OffsetsRequestManager offsetsRequestManager, |
||||
final TopicMetadataRequestManager topicMetadataRequestManager, |
||||
final HeartbeatRequestManager heartbeatRequestManager) { |
||||
super(BACKGROUND_THREAD_NAME, true); |
||||
this.time = time; |
||||
this.running = true; |
||||
this.log = logContext.logger(getClass()); |
||||
this.applicationEventQueue = applicationEventQueue; |
||||
this.backgroundEventQueue = backgroundEventQueue; |
||||
this.applicationEventProcessor = processor; |
||||
this.config = config; |
||||
this.metadata = metadata; |
||||
this.networkClientDelegate = networkClient; |
||||
this.errorEventHandler = errorEventHandler; |
||||
this.groupState = groupState; |
||||
this.requestManagers = new RequestManagers( |
||||
offsetsRequestManager, |
||||
topicMetadataRequestManager, |
||||
Optional.ofNullable(coordinatorManager), |
||||
Optional.ofNullable(commitRequestManager), |
||||
Optional.ofNullable(heartbeatRequestManager)); |
||||
} |
||||
|
||||
public DefaultBackgroundThread(final Time time, |
||||
final ConsumerConfig config, |
||||
final GroupRebalanceConfig rebalanceConfig, |
||||
final LogContext logContext, |
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue, |
||||
final BlockingQueue<BackgroundEvent> backgroundEventQueue, |
||||
final ConsumerMetadata metadata, |
||||
final SubscriptionState subscriptionState, |
||||
final ApiVersions apiVersions, |
||||
final Metrics metrics, |
||||
final Sensor fetcherThrottleTimeSensor) { |
||||
super(BACKGROUND_THREAD_NAME, true); |
||||
requireNonNull(config); |
||||
requireNonNull(rebalanceConfig); |
||||
requireNonNull(logContext); |
||||
requireNonNull(applicationEventQueue); |
||||
requireNonNull(backgroundEventQueue); |
||||
requireNonNull(metadata); |
||||
requireNonNull(subscriptionState); |
||||
try { |
||||
this.time = time; |
||||
this.log = logContext.logger(getClass()); |
||||
this.applicationEventQueue = applicationEventQueue; |
||||
this.backgroundEventQueue = backgroundEventQueue; |
||||
this.config = config; |
||||
this.metadata = metadata; |
||||
final NetworkClient networkClient = ClientUtils.createNetworkClient(config, |
||||
metrics, |
||||
CONSUMER_METRIC_GROUP_PREFIX, |
||||
logContext, |
||||
apiVersions, |
||||
time, |
||||
CONSUMER_MAX_INFLIGHT_REQUESTS_PER_CONNECTION, |
||||
metadata, |
||||
fetcherThrottleTimeSensor); |
||||
this.networkClientDelegate = new NetworkClientDelegate( |
||||
this.time, |
||||
this.config, |
||||
logContext, |
||||
networkClient); |
||||
this.running = true; |
||||
this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue); |
||||
this.groupState = new GroupState(rebalanceConfig); |
||||
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); |
||||
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); |
||||
final int requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); |
||||
|
||||
OffsetsRequestManager offsetsRequestManager = |
||||
new OffsetsRequestManager( |
||||
subscriptionState, |
||||
metadata, |
||||
configuredIsolationLevel(config), |
||||
time, |
||||
retryBackoffMs, |
||||
requestTimeoutMs, |
||||
apiVersions, |
||||
networkClientDelegate, |
||||
logContext); |
||||
CoordinatorRequestManager coordinatorRequestManager = null; |
||||
CommitRequestManager commitRequestManager = null; |
||||
TopicMetadataRequestManager topicMetadataRequestManger = new TopicMetadataRequestManager( |
||||
logContext, |
||||
config); |
||||
HeartbeatRequestManager heartbeatRequestManager = null; |
||||
|
||||
// TODO: consolidate groupState and memberState
|
||||
if (groupState.groupId != null) { |
||||
coordinatorRequestManager = new CoordinatorRequestManager( |
||||
this.time, |
||||
logContext, |
||||
retryBackoffMs, |
||||
retryBackoffMaxMs, |
||||
this.errorEventHandler, |
||||
groupState.groupId); |
||||
commitRequestManager = new CommitRequestManager( |
||||
this.time, |
||||
logContext, |
||||
subscriptionState, |
||||
config, |
||||
coordinatorRequestManager, |
||||
groupState); |
||||
MembershipManager membershipManager = new MembershipManagerImpl(groupState.groupId, logContext); |
||||
heartbeatRequestManager = new HeartbeatRequestManager( |
||||
this.time, |
||||
logContext, |
||||
config, |
||||
coordinatorRequestManager, |
||||
subscriptionState, |
||||
membershipManager, |
||||
errorEventHandler); |
||||
} |
||||
|
||||
this.requestManagers = new RequestManagers( |
||||
offsetsRequestManager, |
||||
topicMetadataRequestManger, |
||||
Optional.ofNullable(coordinatorRequestManager), |
||||
Optional.ofNullable(commitRequestManager), |
||||
Optional.ofNullable(heartbeatRequestManager)); |
||||
this.applicationEventProcessor = new ApplicationEventProcessor( |
||||
backgroundEventQueue, |
||||
requestManagers, |
||||
metadata); |
||||
} catch (final Exception e) { |
||||
close(); |
||||
throw new KafkaException("Failed to construct background processor", e.getCause()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
try { |
||||
log.debug("Background thread started"); |
||||
while (running) { |
||||
try { |
||||
runOnce(); |
||||
} catch (final WakeupException e) { |
||||
log.debug("WakeupException caught, background thread won't be interrupted"); |
||||
// swallow the wakeup exception to prevent killing the background thread.
|
||||
} |
||||
} |
||||
} catch (final Throwable t) { |
||||
log.error("The background thread failed due to unexpected error", t); |
||||
throw new KafkaException(t); |
||||
} finally { |
||||
close(); |
||||
log.debug("{} closed", getClass()); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Poll and process an {@link ApplicationEvent}. It performs the following tasks: |
||||
* 1. Drains and try to process all the requests in the queue. |
||||
* 2. Iterate through the registry, poll, and get the next poll time for the network poll |
||||
* 3. Poll the networkClient to send and retrieve the response. |
||||
*/ |
||||
void runOnce() { |
||||
if (!applicationEventQueue.isEmpty()) { |
||||
LinkedList<ApplicationEvent> res = new LinkedList<>(); |
||||
this.applicationEventQueue.drainTo(res); |
||||
|
||||
for (ApplicationEvent event : res) { |
||||
log.debug("Consuming application event: {}", event); |
||||
Objects.requireNonNull(event); |
||||
applicationEventProcessor.process(event); |
||||
} |
||||
} |
||||
|
||||
final long currentTimeMs = time.milliseconds(); |
||||
final long pollWaitTimeMs = requestManagers.entries().stream() |
||||
.filter(Optional::isPresent) |
||||
.map(m -> m.get().poll(currentTimeMs)) |
||||
.map(this::handlePollResult) |
||||
.reduce(MAX_POLL_TIMEOUT_MS, Math::min); |
||||
networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); |
||||
} |
||||
|
||||
long handlePollResult(NetworkClientDelegate.PollResult res) { |
||||
if (!res.unsentRequests.isEmpty()) { |
||||
networkClientDelegate.addAll(res.unsentRequests); |
||||
} |
||||
return res.timeUntilNextPollMs; |
||||
} |
||||
|
||||
public boolean isRunning() { |
||||
return this.running; |
||||
} |
||||
|
||||
public final void wakeup() { |
||||
networkClientDelegate.wakeup(); |
||||
} |
||||
|
||||
public final void close() { |
||||
this.running = false; |
||||
this.wakeup(); |
||||
Utils.closeQuietly(networkClientDelegate, "network client utils"); |
||||
Utils.closeQuietly(metadata, "consumer metadata client"); |
||||
} |
||||
} |
@ -1,151 +0,0 @@
@@ -1,151 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.ApiVersions; |
||||
import org.apache.kafka.clients.ClientUtils; |
||||
import org.apache.kafka.clients.GroupRebalanceConfig; |
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.EventHandler; |
||||
import org.apache.kafka.common.internals.ClusterResourceListeners; |
||||
import org.apache.kafka.common.metrics.Metrics; |
||||
import org.apache.kafka.common.metrics.Sensor; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.common.utils.Timer; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
import java.util.List; |
||||
import java.util.Objects; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
||||
/** |
||||
* An {@code EventHandler} that uses a single background thread to consume {@code ApplicationEvent} and produce |
||||
* {@code BackgroundEvent} from the {@link DefaultBackgroundThread}. |
||||
*/ |
||||
public class DefaultEventHandler implements EventHandler { |
||||
|
||||
private final BlockingQueue<ApplicationEvent> applicationEventQueue; |
||||
private final BlockingQueue<BackgroundEvent> backgroundEventQueue; |
||||
private final DefaultBackgroundThread backgroundThread; |
||||
|
||||
public DefaultEventHandler(final ConsumerConfig config, |
||||
final GroupRebalanceConfig groupRebalanceConfig, |
||||
final LogContext logContext, |
||||
final SubscriptionState subscriptionState, |
||||
final ApiVersions apiVersions, |
||||
final Metrics metrics, |
||||
final ClusterResourceListeners clusterResourceListeners, |
||||
final Sensor fetcherThrottleTimeSensor) { |
||||
this(Time.SYSTEM, |
||||
config, |
||||
groupRebalanceConfig, |
||||
logContext, |
||||
new LinkedBlockingQueue<>(), |
||||
new LinkedBlockingQueue<>(), |
||||
subscriptionState, |
||||
apiVersions, |
||||
metrics, |
||||
clusterResourceListeners, |
||||
fetcherThrottleTimeSensor); |
||||
} |
||||
|
||||
public DefaultEventHandler(final Time time, |
||||
final ConsumerConfig config, |
||||
final GroupRebalanceConfig groupRebalanceConfig, |
||||
final LogContext logContext, |
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue, |
||||
final BlockingQueue<BackgroundEvent> backgroundEventQueue, |
||||
final SubscriptionState subscriptionState, |
||||
final ApiVersions apiVersions, |
||||
final Metrics metrics, |
||||
final ClusterResourceListeners clusterResourceListeners, |
||||
final Sensor fetcherThrottleTimeSensor) { |
||||
this.applicationEventQueue = applicationEventQueue; |
||||
this.backgroundEventQueue = backgroundEventQueue; |
||||
|
||||
// Bootstrap a metadata object with the bootstrap server IP address, which will be used once for the
|
||||
// subsequent metadata refresh once the background thread has started up.
|
||||
final ConsumerMetadata metadata = new ConsumerMetadata(config, |
||||
subscriptionState, |
||||
logContext, |
||||
clusterResourceListeners); |
||||
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config); |
||||
metadata.bootstrap(addresses); |
||||
|
||||
this.backgroundThread = new DefaultBackgroundThread( |
||||
time, |
||||
config, |
||||
groupRebalanceConfig, |
||||
logContext, |
||||
this.applicationEventQueue, |
||||
this.backgroundEventQueue, |
||||
metadata, |
||||
subscriptionState, |
||||
apiVersions, |
||||
metrics, |
||||
fetcherThrottleTimeSensor); |
||||
backgroundThread.start(); |
||||
} |
||||
|
||||
// VisibleForTesting
|
||||
DefaultEventHandler(final DefaultBackgroundThread backgroundThread, |
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue, |
||||
final BlockingQueue<BackgroundEvent> backgroundEventQueue) { |
||||
this.backgroundThread = backgroundThread; |
||||
this.applicationEventQueue = applicationEventQueue; |
||||
this.backgroundEventQueue = backgroundEventQueue; |
||||
backgroundThread.start(); |
||||
} |
||||
|
||||
@Override |
||||
public Optional<BackgroundEvent> poll() { |
||||
return Optional.ofNullable(backgroundEventQueue.poll()); |
||||
} |
||||
|
||||
@Override |
||||
public boolean isEmpty() { |
||||
return backgroundEventQueue.isEmpty(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean add(final ApplicationEvent event) { |
||||
backgroundThread.wakeup(); |
||||
return applicationEventQueue.add(event); |
||||
} |
||||
|
||||
@Override |
||||
public <T> T addAndGet(final CompletableApplicationEvent<T> event, final Timer timer) { |
||||
Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null"); |
||||
Objects.requireNonNull(timer, "Timer provided to addAndGet must be non-null"); |
||||
add(event); |
||||
return event.get(timer); |
||||
} |
||||
|
||||
public void close() { |
||||
try { |
||||
backgroundThread.close(); |
||||
} catch (final Exception e) { |
||||
throw new RuntimeException(e); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,117 @@
@@ -0,0 +1,117 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.function.BiConsumer; |
||||
import java.util.function.Predicate; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.apache.kafka.clients.ClientResponse; |
||||
import org.apache.kafka.clients.FetchSessionHandler; |
||||
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; |
||||
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; |
||||
import org.apache.kafka.common.Node; |
||||
import org.apache.kafka.common.requests.FetchRequest; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.apache.kafka.common.utils.Time; |
||||
|
||||
/** |
||||
* {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the |
||||
* {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition |
||||
* assignment. |
||||
*/ |
||||
public class FetchRequestManager extends AbstractFetch implements RequestManager { |
||||
|
||||
private final NetworkClientDelegate networkClientDelegate; |
||||
|
||||
FetchRequestManager(final LogContext logContext, |
||||
final Time time, |
||||
final ConsumerMetadata metadata, |
||||
final SubscriptionState subscriptions, |
||||
final FetchConfig fetchConfig, |
||||
final FetchBuffer fetchBuffer, |
||||
final FetchMetricsManager metricsManager, |
||||
final NetworkClientDelegate networkClientDelegate) { |
||||
super(logContext, metadata, subscriptions, fetchConfig, fetchBuffer, metricsManager, time); |
||||
this.networkClientDelegate = networkClientDelegate; |
||||
} |
||||
|
||||
@Override |
||||
protected boolean isUnavailable(Node node) { |
||||
return networkClientDelegate.isUnavailable(node); |
||||
} |
||||
|
||||
@Override |
||||
protected void maybeThrowAuthFailure(Node node) { |
||||
networkClientDelegate.maybeThrowAuthFailure(node); |
||||
} |
||||
|
||||
/** |
||||
* {@inheritDoc} |
||||
*/ |
||||
@Override |
||||
public PollResult poll(long currentTimeMs) { |
||||
return pollInternal( |
||||
prepareFetchRequests(), |
||||
this::handleFetchSuccess, |
||||
this::handleFetchFailure |
||||
); |
||||
} |
||||
|
||||
/** |
||||
* {@inheritDoc} |
||||
*/ |
||||
@Override |
||||
public PollResult pollOnClose() { |
||||
return pollInternal( |
||||
prepareCloseFetchSessionRequests(), |
||||
this::handleCloseFetchSessionSuccess, |
||||
this::handleCloseFetchSessionFailure |
||||
); |
||||
} |
||||
|
||||
/** |
||||
* Creates the {@link PollResult poll result} that contains a list of zero or more |
||||
* {@link FetchRequest.Builder fetch requests}. |
||||
* |
||||
* @param fetchRequests {@link Map} of {@link Node nodes} to their {@link FetchSessionHandler.FetchRequestData} |
||||
* @param successHandler {@link ResponseHandler Handler for successful responses} |
||||
* @param errorHandler {@link ResponseHandler Handler for failure responses} |
||||
* @return {@link PollResult} |
||||
*/ |
||||
private PollResult pollInternal(Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests, |
||||
ResponseHandler<ClientResponse> successHandler, |
||||
ResponseHandler<Throwable> errorHandler) { |
||||
List<UnsentRequest> requests = fetchRequests.entrySet().stream().map(entry -> { |
||||
final Node fetchTarget = entry.getKey(); |
||||
final FetchSessionHandler.FetchRequestData data = entry.getValue(); |
||||
final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); |
||||
final BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> { |
||||
if (error != null) |
||||
errorHandler.handle(fetchTarget, data, error); |
||||
else |
||||
successHandler.handle(fetchTarget, data, clientResponse); |
||||
}; |
||||
|
||||
return new UnsentRequest(request, fetchTarget, responseHandler); |
||||
}).collect(Collectors.toList()); |
||||
|
||||
return new PollResult(requests); |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,115 @@
@@ -0,0 +1,115 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; |
||||
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; |
||||
import org.apache.kafka.clients.consumer.internals.RequestManagers; |
||||
import org.apache.kafka.common.internals.IdempotentCloser; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.common.utils.Timer; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.slf4j.Logger; |
||||
|
||||
import java.io.Closeable; |
||||
import java.time.Duration; |
||||
import java.util.Objects; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.Future; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.function.Supplier; |
||||
|
||||
/** |
||||
* An event handler that receives {@link ApplicationEvent application events} from the application thread which |
||||
* are then readable from the {@link ApplicationEventProcessor} in the {@link ConsumerNetworkThread network thread}. |
||||
*/ |
||||
public class ApplicationEventHandler implements Closeable { |
||||
|
||||
private final Logger log; |
||||
private final BlockingQueue<ApplicationEvent> applicationEventQueue; |
||||
private final ConsumerNetworkThread networkThread; |
||||
private final IdempotentCloser closer = new IdempotentCloser(); |
||||
|
||||
public ApplicationEventHandler(final LogContext logContext, |
||||
final Time time, |
||||
final BlockingQueue<ApplicationEvent> applicationEventQueue, |
||||
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier, |
||||
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier, |
||||
final Supplier<RequestManagers> requestManagersSupplier) { |
||||
this.log = logContext.logger(ApplicationEventHandler.class); |
||||
this.applicationEventQueue = applicationEventQueue; |
||||
this.networkThread = new ConsumerNetworkThread(logContext, |
||||
time, |
||||
applicationEventProcessorSupplier, |
||||
networkClientDelegateSupplier, |
||||
requestManagersSupplier); |
||||
this.networkThread.start(); |
||||
} |
||||
|
||||
/** |
||||
* Add an {@link ApplicationEvent} to the handler and then internally invoke {@link #wakeupNetworkThread} |
||||
* to alert the network I/O thread that it has something to process. |
||||
* |
||||
* @param event An {@link ApplicationEvent} created by the application thread |
||||
*/ |
||||
public void add(final ApplicationEvent event) { |
||||
Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null"); |
||||
log.trace("Enqueued event: {}", event); |
||||
applicationEventQueue.add(event); |
||||
wakeupNetworkThread(); |
||||
} |
||||
|
||||
/** |
||||
* Wakeup the {@link ConsumerNetworkThread network I/O thread} to pull the next event(s) from the queue. |
||||
*/ |
||||
public void wakeupNetworkThread() { |
||||
networkThread.wakeup(); |
||||
} |
||||
|
||||
/** |
||||
* Add a {@link CompletableApplicationEvent} to the handler. The method blocks waiting for the result, and will |
||||
* return the result value upon successful completion; otherwise throws an error. |
||||
* |
||||
* <p/> |
||||
* |
||||
* See {@link CompletableApplicationEvent#get(Timer)} and {@link Future#get(long, TimeUnit)} for more details. |
||||
* |
||||
* @param event A {@link CompletableApplicationEvent} created by the polling thread |
||||
* @param timer Timer for which to wait for the event to complete |
||||
* @return Value that is the result of the event |
||||
* @param <T> Type of return value of the event |
||||
*/ |
||||
public <T> T addAndGet(final CompletableApplicationEvent<T> event, final Timer timer) { |
||||
Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null"); |
||||
Objects.requireNonNull(timer, "Timer provided to addAndGet must be non-null"); |
||||
add(event); |
||||
return event.get(timer); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
close(Duration.ZERO); |
||||
} |
||||
|
||||
public void close(final Duration timeout) { |
||||
closer.close( |
||||
() -> Utils.closeQuietly(() -> networkThread.close(timeout), "consumer network thread"), |
||||
() -> log.warn("The application event handler was already closed") |
||||
); |
||||
} |
||||
} |
@ -0,0 +1,52 @@
@@ -0,0 +1,52 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.slf4j.Logger; |
||||
|
||||
import java.util.Objects; |
||||
import java.util.Queue; |
||||
|
||||
/** |
||||
* An event handler that receives {@link BackgroundEvent background events} from the |
||||
* {@link ConsumerNetworkThread network thread} which are then made available to the application thread |
||||
* via the {@link BackgroundEventProcessor}. |
||||
*/ |
||||
|
||||
public class BackgroundEventHandler { |
||||
|
||||
private final Logger log; |
||||
private final Queue<BackgroundEvent> backgroundEventQueue; |
||||
|
||||
public BackgroundEventHandler(final LogContext logContext, final Queue<BackgroundEvent> backgroundEventQueue) { |
||||
this.log = logContext.logger(BackgroundEventHandler.class); |
||||
this.backgroundEventQueue = backgroundEventQueue; |
||||
} |
||||
|
||||
/** |
||||
* Add a {@link BackgroundEvent} to the handler. |
||||
* |
||||
* @param event A {@link BackgroundEvent} created by the {@link ConsumerNetworkThread network thread} |
||||
*/ |
||||
public void add(BackgroundEvent event) { |
||||
Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null"); |
||||
log.trace("Enqueued event: {}", event); |
||||
backgroundEventQueue.add(event); |
||||
} |
||||
} |
@ -0,0 +1,75 @@
@@ -0,0 +1,75 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; |
||||
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; |
||||
import org.apache.kafka.common.KafkaException; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
|
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
|
||||
/** |
||||
* An {@link EventProcessor} that is created and executes in the application thread for the purpose of processing |
||||
* {@link BackgroundEvent background events} generated by the {@link ConsumerNetworkThread network thread}. |
||||
* Those events are generally of two types: |
||||
* |
||||
* <ul> |
||||
* <li>Errors that occur in the network thread that need to be propagated to the application thread</li> |
||||
* <li>{@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread</li> |
||||
* </ul> |
||||
*/ |
||||
public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> { |
||||
|
||||
public BackgroundEventProcessor(final LogContext logContext, |
||||
final BlockingQueue<BackgroundEvent> backgroundEventQueue) { |
||||
super(logContext, backgroundEventQueue); |
||||
} |
||||
|
||||
/** |
||||
* Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. |
||||
* It is possible that {@link ErrorBackgroundEvent an error} could occur when processing the events. |
||||
* In such cases, the processor will take a reference to the first error, continue to process the |
||||
* remaining events, and then throw the first error that occurred. |
||||
*/ |
||||
@Override |
||||
public void process() { |
||||
AtomicReference<KafkaException> firstError = new AtomicReference<>(); |
||||
process((event, error) -> firstError.compareAndSet(null, error)); |
||||
|
||||
if (firstError.get() != null) |
||||
throw firstError.get(); |
||||
} |
||||
|
||||
@Override |
||||
public void process(final BackgroundEvent event) { |
||||
if (event.type() == BackgroundEvent.Type.ERROR) |
||||
process((ErrorBackgroundEvent) event); |
||||
else |
||||
throw new IllegalArgumentException("Background event type " + event.type() + " was not expected"); |
||||
} |
||||
|
||||
@Override |
||||
protected Class<BackgroundEvent> getEventClass() { |
||||
return BackgroundEvent.class; |
||||
} |
||||
|
||||
private void process(final ErrorBackgroundEvent event) { |
||||
throw event.error(); |
||||
} |
||||
} |
@ -0,0 +1,25 @@
@@ -0,0 +1,25 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
public interface CompletableEvent<T> { |
||||
|
||||
CompletableFuture<T> future(); |
||||
|
||||
} |
@ -1,65 +0,0 @@
@@ -1,65 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import org.apache.kafka.common.utils.Timer; |
||||
|
||||
import java.io.Closeable; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.Future; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* This class interfaces with the KafkaConsumer and the background thread. It allows the caller to enqueue events via |
||||
* the {@code add()} method and to retrieve events via the {@code poll()} method. |
||||
*/ |
||||
public interface EventHandler extends Closeable { |
||||
/** |
||||
* Retrieves and removes a {@link BackgroundEvent}. Returns an empty Optional instance if there is nothing. |
||||
* @return an Optional of {@link BackgroundEvent} if the value is present. Otherwise, an empty Optional. |
||||
*/ |
||||
Optional<BackgroundEvent> poll(); |
||||
|
||||
/** |
||||
* Check whether there are pending {@code BackgroundEvent} await to be consumed. |
||||
* @return true if there are no pending event |
||||
*/ |
||||
boolean isEmpty(); |
||||
|
||||
/** |
||||
* Add an {@link ApplicationEvent} to the handler. The method returns true upon successful add; otherwise returns |
||||
* false. |
||||
* @param event An {@link ApplicationEvent} created by the polling thread. |
||||
* @return true upon successful add. |
||||
*/ |
||||
boolean add(ApplicationEvent event); |
||||
|
||||
/** |
||||
* Add a {@link CompletableApplicationEvent} to the handler. The method blocks waiting for the result, and will |
||||
* return the result value upon successful completion; otherwise throws an error. |
||||
* |
||||
* <p/> |
||||
* |
||||
* See {@link CompletableApplicationEvent#get(Timer)} and {@link Future#get(long, TimeUnit)} for more details. |
||||
* |
||||
* @param event A {@link CompletableApplicationEvent} created by the polling thread. |
||||
* @param timer Timer for which to wait for the event to complete |
||||
* @return Value that is the result of the event |
||||
* @param <T> Type of return value of the event |
||||
*/ |
||||
<T> T addAndGet(final CompletableApplicationEvent<T> event, final Timer timer); |
||||
} |
@ -0,0 +1,137 @@
@@ -0,0 +1,137 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import org.apache.kafka.common.KafkaException; |
||||
import org.apache.kafka.common.internals.IdempotentCloser; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.slf4j.Logger; |
||||
|
||||
import java.io.Closeable; |
||||
import java.util.LinkedList; |
||||
import java.util.List; |
||||
import java.util.Objects; |
||||
import java.util.concurrent.BlockingQueue; |
||||
|
||||
/** |
||||
* An {@link EventProcessor} is the means by which events <em>produced</em> by thread <em>A</em> are |
||||
* <em>processed</em> by thread <em>B</em>. By definition, threads <em>A</em> and <em>B</em> run in parallel to |
||||
* each other, so a mechanism is needed with which to receive and process the events from the other thread. That |
||||
* communication channel is formed around {@link BlockingQueue a shared queue} into which thread <em>A</em> |
||||
* enqueues events and thread <em>B</em> reads and processes those events. |
||||
*/ |
||||
public abstract class EventProcessor<T> implements Closeable { |
||||
|
||||
private final Logger log; |
||||
private final BlockingQueue<T> eventQueue; |
||||
private final IdempotentCloser closer; |
||||
|
||||
protected EventProcessor(final LogContext logContext, final BlockingQueue<T> eventQueue) { |
||||
this.log = logContext.logger(EventProcessor.class); |
||||
this.eventQueue = eventQueue; |
||||
this.closer = new IdempotentCloser(); |
||||
} |
||||
|
||||
public abstract void process(); |
||||
|
||||
public abstract void process(T event); |
||||
|
||||
@Override |
||||
public void close() { |
||||
closer.close(this::closeInternal, () -> log.warn("The event processor was already closed")); |
||||
} |
||||
|
||||
protected abstract Class<T> getEventClass(); |
||||
|
||||
protected interface ProcessErrorHandler<T> { |
||||
|
||||
void onError(T event, KafkaException error); |
||||
} |
||||
|
||||
/** |
||||
* Drains all available events from the queue, and then processes them in order. If any errors are thrown while |
||||
* processing the individual events, these are submitted to the given {@link ProcessErrorHandler}. |
||||
*/ |
||||
protected void process(ProcessErrorHandler<T> processErrorHandler) { |
||||
String eventClassName = getEventClass().getSimpleName(); |
||||
closer.assertOpen(() -> String.format("The processor was previously closed, so no further %s processing can occur", eventClassName)); |
||||
|
||||
List<T> events = drain(); |
||||
|
||||
try { |
||||
log.debug("Starting processing of {} {}(s)", events.size(), eventClassName); |
||||
|
||||
for (T event : events) { |
||||
try { |
||||
Objects.requireNonNull(event, () -> String.format("Attempted to process a null %s", eventClassName)); |
||||
log.debug("Consuming {}: {}", eventClassName, event); |
||||
process(event); |
||||
} catch (Throwable t) { |
||||
log.warn("An error occurred when processing the {}: {}", eventClassName, t.getMessage(), t); |
||||
|
||||
KafkaException error; |
||||
|
||||
if (t instanceof KafkaException) |
||||
error = (KafkaException) t; |
||||
else |
||||
error = new KafkaException(t); |
||||
|
||||
processErrorHandler.onError(event, error); |
||||
} |
||||
} |
||||
} finally { |
||||
log.debug("Completed processing of {} {}(s)", events.size(), eventClassName); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* It is possible for the consumer to close before complete processing all the events in the queue. In |
||||
* this case, we need to throw an exception to notify the user the consumer is closed. |
||||
*/ |
||||
private void closeInternal() { |
||||
String eventClassName = getEventClass().getSimpleName(); |
||||
log.trace("Closing event processor for {}", eventClassName); |
||||
List<T> incompleteEvents = drain(); |
||||
|
||||
if (incompleteEvents.isEmpty()) |
||||
return; |
||||
|
||||
KafkaException exception = new KafkaException("The consumer is closed"); |
||||
|
||||
// Check each of the events and if it has a Future that is incomplete, complete it exceptionally.
|
||||
incompleteEvents |
||||
.stream() |
||||
.filter(e -> e instanceof CompletableEvent) |
||||
.map(e -> ((CompletableEvent<?>) e).future()) |
||||
.filter(f -> !f.isDone()) |
||||
.forEach(f -> { |
||||
log.debug("Completing {} with exception {}", f, exception.getMessage()); |
||||
f.completeExceptionally(exception); |
||||
}); |
||||
|
||||
log.debug("Discarding {} {}s because the consumer is closing", incompleteEvents.size(), eventClassName); |
||||
} |
||||
|
||||
/** |
||||
* Moves all the events from the queue to the returned list. |
||||
*/ |
||||
private List<T> drain() { |
||||
LinkedList<T> events = new LinkedList<>(); |
||||
eventQueue.drainTo(events); |
||||
return events; |
||||
} |
||||
} |
@ -1,62 +0,0 @@
@@ -1,62 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import java.util.Objects; |
||||
|
||||
/** |
||||
* The event is a no-op, but is intentionally left here for demonstration and test purposes. |
||||
*/ |
||||
public class NoopApplicationEvent extends ApplicationEvent { |
||||
|
||||
private final String message; |
||||
|
||||
public NoopApplicationEvent(final String message) { |
||||
super(Type.NOOP); |
||||
this.message = Objects.requireNonNull(message); |
||||
} |
||||
|
||||
public String message() { |
||||
return message; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) return true; |
||||
if (o == null || getClass() != o.getClass()) return false; |
||||
if (!super.equals(o)) return false; |
||||
|
||||
NoopApplicationEvent that = (NoopApplicationEvent) o; |
||||
|
||||
return message.equals(that.message); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
int result = super.hashCode(); |
||||
result = 31 * result + message.hashCode(); |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "NoopApplicationEvent{" + |
||||
toStringBase() + |
||||
",message='" + message + '\'' + |
||||
'}'; |
||||
} |
||||
} |
@ -1,62 +0,0 @@
@@ -1,62 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import java.util.Objects; |
||||
|
||||
/** |
||||
* No-op event. Intentionally left it here for demonstration purpose. |
||||
*/ |
||||
public class NoopBackgroundEvent extends BackgroundEvent { |
||||
|
||||
private final String message; |
||||
|
||||
public NoopBackgroundEvent(final String message) { |
||||
super(Type.NOOP); |
||||
this.message = Objects.requireNonNull(message); |
||||
} |
||||
|
||||
public String message() { |
||||
return message; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) return true; |
||||
if (o == null || getClass() != o.getClass()) return false; |
||||
if (!super.equals(o)) return false; |
||||
|
||||
NoopBackgroundEvent that = (NoopBackgroundEvent) o; |
||||
|
||||
return message.equals(that.message); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
int result = super.hashCode(); |
||||
result = 31 * result + message.hashCode(); |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "NoopBackgroundEvent{" + |
||||
toStringBase() + |
||||
", message='" + message + '\'' + |
||||
'}'; |
||||
} |
||||
} |
@ -0,0 +1,269 @@
@@ -0,0 +1,269 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.MockClient; |
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; |
||||
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; |
||||
import org.apache.kafka.common.TopicPartition; |
||||
import org.apache.kafka.common.message.FindCoordinatorRequestData; |
||||
import org.apache.kafka.common.requests.FindCoordinatorRequest; |
||||
import org.apache.kafka.common.requests.MetadataResponse; |
||||
import org.apache.kafka.common.requests.RequestTestUtils; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.test.TestCondition; |
||||
import org.apache.kafka.test.TestUtils; |
||||
import org.junit.jupiter.api.AfterEach; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
import java.time.Duration; |
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; |
||||
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; |
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; |
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
import static org.junit.jupiter.api.Assertions.assertFalse; |
||||
import static org.junit.jupiter.api.Assertions.assertTrue; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.anyLong; |
||||
import static org.mockito.ArgumentMatchers.eq; |
||||
import static org.mockito.Mockito.doThrow; |
||||
import static org.mockito.Mockito.spy; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
@SuppressWarnings("ClassDataAbstractionCoupling") |
||||
public class ConsumerNetworkThreadTest { |
||||
|
||||
private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder; |
||||
private Time time; |
||||
private ConsumerMetadata metadata; |
||||
private NetworkClientDelegate networkClient; |
||||
private BlockingQueue<ApplicationEvent> applicationEventsQueue; |
||||
private ApplicationEventProcessor applicationEventProcessor; |
||||
private OffsetsRequestManager offsetsRequestManager; |
||||
private CommitRequestManager commitManager; |
||||
private ConsumerNetworkThread consumerNetworkThread; |
||||
private MockClient client; |
||||
|
||||
@BeforeEach |
||||
public void setup() { |
||||
testBuilder = new ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder(); |
||||
time = testBuilder.time; |
||||
metadata = testBuilder.metadata; |
||||
networkClient = testBuilder.networkClientDelegate; |
||||
client = testBuilder.client; |
||||
applicationEventsQueue = testBuilder.applicationEventQueue; |
||||
applicationEventProcessor = testBuilder.applicationEventProcessor; |
||||
commitManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); |
||||
offsetsRequestManager = testBuilder.offsetsRequestManager; |
||||
consumerNetworkThread = testBuilder.consumerNetworkThread; |
||||
consumerNetworkThread.initializeResources(); |
||||
} |
||||
|
||||
@AfterEach |
||||
public void tearDown() { |
||||
if (testBuilder != null) |
||||
testBuilder.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testStartupAndTearDown() throws InterruptedException { |
||||
// The consumer is closed in ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder.close()
|
||||
// which is called from tearDown().
|
||||
consumerNetworkThread.start(); |
||||
|
||||
TestCondition isStarted = () -> consumerNetworkThread.isRunning(); |
||||
TestCondition isClosed = () -> !(consumerNetworkThread.isRunning() || consumerNetworkThread.isAlive()); |
||||
|
||||
// There's a nonzero amount of time between starting the thread and having it
|
||||
// begin to execute our code. Wait for a bit before checking...
|
||||
TestUtils.waitForCondition(isStarted, |
||||
"The consumer network thread did not start within " + DEFAULT_MAX_WAIT_MS + " ms"); |
||||
|
||||
consumerNetworkThread.close(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); |
||||
|
||||
TestUtils.waitForCondition(isClosed, |
||||
"The consumer network thread did not stop within " + DEFAULT_MAX_WAIT_MS + " ms"); |
||||
} |
||||
|
||||
@Test |
||||
public void testApplicationEvent() { |
||||
ApplicationEvent e = new CommitApplicationEvent(new HashMap<>()); |
||||
applicationEventsQueue.add(e); |
||||
consumerNetworkThread.runOnce(); |
||||
verify(applicationEventProcessor, times(1)).process(e); |
||||
} |
||||
|
||||
@Test |
||||
public void testMetadataUpdateEvent() { |
||||
ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); |
||||
applicationEventsQueue.add(e); |
||||
consumerNetworkThread.runOnce(); |
||||
verify(metadata).requestUpdateForNewTopics(); |
||||
} |
||||
|
||||
@Test |
||||
public void testCommitEvent() { |
||||
ApplicationEvent e = new CommitApplicationEvent(new HashMap<>()); |
||||
applicationEventsQueue.add(e); |
||||
consumerNetworkThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(CommitApplicationEvent.class)); |
||||
} |
||||
|
||||
@Test |
||||
public void testListOffsetsEventIsProcessed() { |
||||
Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L); |
||||
ApplicationEvent e = new ListOffsetsApplicationEvent(timestamps, true); |
||||
applicationEventsQueue.add(e); |
||||
consumerNetworkThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(ListOffsetsApplicationEvent.class)); |
||||
assertTrue(applicationEventsQueue.isEmpty()); |
||||
} |
||||
|
||||
@Test |
||||
public void testResetPositionsEventIsProcessed() { |
||||
ResetPositionsApplicationEvent e = new ResetPositionsApplicationEvent(); |
||||
applicationEventsQueue.add(e); |
||||
consumerNetworkThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); |
||||
assertTrue(applicationEventsQueue.isEmpty()); |
||||
} |
||||
|
||||
@Test |
||||
public void testResetPositionsProcessFailureIsIgnored() { |
||||
doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); |
||||
|
||||
ResetPositionsApplicationEvent event = new ResetPositionsApplicationEvent(); |
||||
applicationEventsQueue.add(event); |
||||
assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); |
||||
|
||||
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); |
||||
} |
||||
|
||||
@Test |
||||
public void testValidatePositionsEventIsProcessed() { |
||||
ValidatePositionsApplicationEvent e = new ValidatePositionsApplicationEvent(); |
||||
applicationEventsQueue.add(e); |
||||
consumerNetworkThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class)); |
||||
assertTrue(applicationEventsQueue.isEmpty()); |
||||
} |
||||
|
||||
@Test |
||||
public void testAssignmentChangeEvent() { |
||||
HashMap<TopicPartition, OffsetAndMetadata> offset = mockTopicPartitionOffset(); |
||||
|
||||
final long currentTimeMs = time.milliseconds(); |
||||
ApplicationEvent e = new AssignmentChangeApplicationEvent(offset, currentTimeMs); |
||||
applicationEventsQueue.add(e); |
||||
|
||||
consumerNetworkThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class)); |
||||
verify(networkClient, times(1)).poll(anyLong(), anyLong()); |
||||
verify(commitManager, times(1)).updateAutoCommitTimer(currentTimeMs); |
||||
verify(commitManager, times(1)).maybeAutoCommit(offset); |
||||
} |
||||
|
||||
@Test |
||||
void testFetchTopicMetadata() { |
||||
applicationEventsQueue.add(new TopicMetadataApplicationEvent("topic")); |
||||
consumerNetworkThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(TopicMetadataApplicationEvent.class)); |
||||
} |
||||
|
||||
@Test |
||||
void testPollResultTimer() { |
||||
NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest( |
||||
new FindCoordinatorRequest.Builder( |
||||
new FindCoordinatorRequestData() |
||||
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) |
||||
.setKey("foobar")), |
||||
Optional.empty()); |
||||
req.setTimer(time, DEFAULT_REQUEST_TIMEOUT_MS); |
||||
|
||||
// purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE upon success
|
||||
NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( |
||||
10, |
||||
Collections.singletonList(req)); |
||||
assertEquals(10, networkClient.addAll(success)); |
||||
|
||||
NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( |
||||
10, |
||||
new ArrayList<>()); |
||||
assertEquals(10, networkClient.addAll(failure)); |
||||
} |
||||
|
||||
@Test |
||||
void testRequestManagersArePolledOnce() { |
||||
consumerNetworkThread.runOnce(); |
||||
testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> verify(rm, times(1)).poll(anyLong()))); |
||||
verify(networkClient, times(1)).poll(anyLong(), anyLong()); |
||||
} |
||||
|
||||
@Test |
||||
void testEnsureMetadataUpdateOnPoll() { |
||||
MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); |
||||
client.prepareMetadataUpdate(metadataResponse); |
||||
metadata.requestUpdate(false); |
||||
consumerNetworkThread.runOnce(); |
||||
verify(metadata, times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), anyLong()); |
||||
} |
||||
|
||||
@Test |
||||
void testEnsureEventsAreCompleted() { |
||||
CompletableApplicationEvent<Void> event1 = spy(new CommitApplicationEvent(Collections.emptyMap())); |
||||
ApplicationEvent event2 = new CommitApplicationEvent(Collections.emptyMap()); |
||||
CompletableFuture<Void> future = new CompletableFuture<>(); |
||||
when(event1.future()).thenReturn(future); |
||||
applicationEventsQueue.add(event1); |
||||
applicationEventsQueue.add(event2); |
||||
assertFalse(future.isDone()); |
||||
assertFalse(applicationEventsQueue.isEmpty()); |
||||
|
||||
consumerNetworkThread.cleanup(); |
||||
assertTrue(future.isCompletedExceptionally()); |
||||
assertTrue(applicationEventsQueue.isEmpty()); |
||||
} |
||||
|
||||
private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() { |
||||
final TopicPartition t0 = new TopicPartition("t0", 2); |
||||
final TopicPartition t1 = new TopicPartition("t0", 3); |
||||
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>(); |
||||
topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L)); |
||||
topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L)); |
||||
return topicPartitionOffsets; |
||||
} |
||||
} |
@ -0,0 +1,370 @@
@@ -0,0 +1,370 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.ApiVersions; |
||||
import org.apache.kafka.clients.CommonClientConfigs; |
||||
import org.apache.kafka.clients.GroupRebalanceConfig; |
||||
import org.apache.kafka.clients.MockClient; |
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; |
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; |
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor; |
||||
import org.apache.kafka.common.internals.ClusterResourceListeners; |
||||
import org.apache.kafka.common.metrics.Metrics; |
||||
import org.apache.kafka.common.requests.MetadataResponse; |
||||
import org.apache.kafka.common.requests.RequestTestUtils; |
||||
import org.apache.kafka.common.serialization.StringDeserializer; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.apache.kafka.common.utils.MockTime; |
||||
import org.apache.kafka.common.utils.Time; |
||||
|
||||
import java.io.Closeable; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.Properties; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager; |
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; |
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; |
||||
import static org.apache.kafka.common.utils.Utils.closeQuietly; |
||||
import static org.mockito.Mockito.spy; |
||||
|
||||
@SuppressWarnings("ClassDataAbstractionCoupling") |
||||
public class ConsumerTestBuilder implements Closeable { |
||||
|
||||
static final long DEFAULT_RETRY_BACKOFF_MS = 80; |
||||
static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; |
||||
static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; |
||||
static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; |
||||
static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id"; |
||||
static final String DEFAULT_GROUP_ID = "group-id"; |
||||
static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; |
||||
static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; |
||||
|
||||
final LogContext logContext = new LogContext(); |
||||
final Time time = new MockTime(0); |
||||
public final BlockingQueue<ApplicationEvent> applicationEventQueue; |
||||
public final BlockingQueue<BackgroundEvent> backgroundEventQueue; |
||||
final ConsumerConfig config; |
||||
final long retryBackoffMs; |
||||
final SubscriptionState subscriptions; |
||||
final ConsumerMetadata metadata; |
||||
final FetchConfig fetchConfig; |
||||
final FetchBuffer fetchBuffer; |
||||
final Metrics metrics; |
||||
final FetchMetricsManager metricsManager; |
||||
final NetworkClientDelegate networkClientDelegate; |
||||
final OffsetsRequestManager offsetsRequestManager; |
||||
final Optional<CoordinatorRequestManager> coordinatorRequestManager; |
||||
final Optional<CommitRequestManager> commitRequestManager; |
||||
final Optional<HeartbeatRequestManager> heartbeatRequestManager; |
||||
final Optional<MembershipManager> membershipManager; |
||||
final Optional<HeartbeatRequestManager.HeartbeatRequestState> heartbeatRequestState; |
||||
final TopicMetadataRequestManager topicMetadataRequestManager; |
||||
final FetchRequestManager fetchRequestManager; |
||||
final RequestManagers requestManagers; |
||||
public final ApplicationEventProcessor applicationEventProcessor; |
||||
public final BackgroundEventProcessor backgroundEventProcessor; |
||||
public final BackgroundEventHandler backgroundEventHandler; |
||||
final MockClient client; |
||||
final Optional<GroupInformation> groupInfo; |
||||
|
||||
public ConsumerTestBuilder() { |
||||
this(Optional.empty()); |
||||
} |
||||
|
||||
public ConsumerTestBuilder(Optional<GroupInformation> groupInfo) { |
||||
this.groupInfo = groupInfo; |
||||
this.applicationEventQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventHandler = spy(new BackgroundEventHandler(logContext, backgroundEventQueue)); |
||||
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( |
||||
100, |
||||
DEFAULT_MAX_POLL_INTERVAL_MS, |
||||
DEFAULT_HEARTBEAT_INTERVAL_MS, |
||||
groupInfo.map(gi -> gi.groupState.groupId).orElse(null), |
||||
groupInfo.flatMap(gi -> gi.groupState.groupInstanceId), |
||||
DEFAULT_RETRY_BACKOFF_MS, |
||||
DEFAULT_RETRY_BACKOFF_MAX_MS, |
||||
true); |
||||
GroupState groupState = new GroupState(groupRebalanceConfig); |
||||
ApiVersions apiVersions = new ApiVersions(); |
||||
|
||||
Properties properties = new Properties(); |
||||
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
properties.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, DEFAULT_RETRY_BACKOFF_MS); |
||||
properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_REQUEST_TIMEOUT_MS); |
||||
properties.put(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG, DEFAULT_MAX_POLL_INTERVAL_MS); |
||||
|
||||
groupInfo.ifPresent(gi -> { |
||||
properties.put(GROUP_ID_CONFIG, gi.groupState.groupId); |
||||
gi.groupState.groupInstanceId.ifPresent(groupInstanceId -> properties.put(GROUP_INSTANCE_ID_CONFIG, groupInstanceId)); |
||||
}); |
||||
|
||||
this.config = new ConsumerConfig(properties); |
||||
|
||||
this.fetchConfig = new FetchConfig(config); |
||||
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); |
||||
final long requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); |
||||
this.metrics = createMetrics(config, time); |
||||
|
||||
this.subscriptions = spy(createSubscriptionState(config, logContext)); |
||||
this.metadata = spy(new ConsumerMetadata(config, subscriptions, logContext, new ClusterResourceListeners())); |
||||
this.metricsManager = createFetchMetricsManager(metrics); |
||||
|
||||
this.client = new MockClient(time, metadata); |
||||
MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() { |
||||
{ |
||||
String topic1 = "test1"; |
||||
put(topic1, 1); |
||||
String topic2 = "test2"; |
||||
put(topic2, 1); |
||||
} |
||||
}); |
||||
this.client.updateMetadata(metadataResponse); |
||||
|
||||
this.networkClientDelegate = spy(new NetworkClientDelegate(time, |
||||
config, |
||||
logContext, |
||||
client)); |
||||
this.offsetsRequestManager = spy(new OffsetsRequestManager(subscriptions, |
||||
metadata, |
||||
fetchConfig.isolationLevel, |
||||
time, |
||||
retryBackoffMs, |
||||
requestTimeoutMs, |
||||
apiVersions, |
||||
networkClientDelegate, |
||||
backgroundEventHandler, |
||||
logContext)); |
||||
|
||||
if (groupInfo.isPresent()) { |
||||
GroupInformation gi = groupInfo.get(); |
||||
CoordinatorRequestManager coordinator = spy(new CoordinatorRequestManager( |
||||
time, |
||||
logContext, |
||||
DEFAULT_RETRY_BACKOFF_MS, |
||||
DEFAULT_RETRY_BACKOFF_MAX_MS, |
||||
backgroundEventHandler, |
||||
gi.groupState.groupId |
||||
)); |
||||
CommitRequestManager commit = spy(new CommitRequestManager(time, |
||||
logContext, |
||||
subscriptions, |
||||
config, |
||||
coordinator, |
||||
groupState)); |
||||
MembershipManager mm = spy( |
||||
new MembershipManagerImpl( |
||||
gi.groupState.groupId, |
||||
gi.groupState.groupInstanceId.orElse(null), |
||||
null, |
||||
logContext |
||||
) |
||||
); |
||||
HeartbeatRequestManager.HeartbeatRequestState state = spy(new HeartbeatRequestManager.HeartbeatRequestState(logContext, |
||||
time, |
||||
gi.heartbeatIntervalMs, |
||||
retryBackoffMs, |
||||
DEFAULT_RETRY_BACKOFF_MAX_MS, |
||||
gi.heartbeatJitterMs)); |
||||
HeartbeatRequestManager heartbeat = spy(new HeartbeatRequestManager( |
||||
logContext, |
||||
time, |
||||
config, |
||||
coordinator, |
||||
subscriptions, |
||||
mm, |
||||
state, |
||||
backgroundEventHandler)); |
||||
|
||||
this.coordinatorRequestManager = Optional.of(coordinator); |
||||
this.commitRequestManager = Optional.of(commit); |
||||
this.heartbeatRequestManager = Optional.of(heartbeat); |
||||
this.heartbeatRequestState = Optional.of(state); |
||||
this.membershipManager = Optional.of(mm); |
||||
} else { |
||||
this.coordinatorRequestManager = Optional.empty(); |
||||
this.commitRequestManager = Optional.empty(); |
||||
this.heartbeatRequestManager = Optional.empty(); |
||||
this.heartbeatRequestState = Optional.empty(); |
||||
this.membershipManager = Optional.empty(); |
||||
} |
||||
|
||||
this.fetchBuffer = new FetchBuffer(logContext); |
||||
this.fetchRequestManager = spy(new FetchRequestManager(logContext, |
||||
time, |
||||
metadata, |
||||
subscriptions, |
||||
fetchConfig, |
||||
fetchBuffer, |
||||
metricsManager, |
||||
networkClientDelegate)); |
||||
this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager(logContext, |
||||
config)); |
||||
this.requestManagers = new RequestManagers(logContext, |
||||
offsetsRequestManager, |
||||
topicMetadataRequestManager, |
||||
fetchRequestManager, |
||||
coordinatorRequestManager, |
||||
commitRequestManager, |
||||
heartbeatRequestManager); |
||||
this.applicationEventProcessor = spy(new ApplicationEventProcessor( |
||||
logContext, |
||||
applicationEventQueue, |
||||
requestManagers, |
||||
metadata) |
||||
); |
||||
this.backgroundEventProcessor = spy(new BackgroundEventProcessor(logContext, backgroundEventQueue)); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
closeQuietly(requestManagers, RequestManagers.class.getSimpleName()); |
||||
closeQuietly(applicationEventProcessor, ApplicationEventProcessor.class.getSimpleName()); |
||||
closeQuietly(backgroundEventProcessor, BackgroundEventProcessor.class.getSimpleName()); |
||||
} |
||||
|
||||
public static class ConsumerNetworkThreadTestBuilder extends ConsumerTestBuilder { |
||||
|
||||
final ConsumerNetworkThread consumerNetworkThread; |
||||
|
||||
public ConsumerNetworkThreadTestBuilder() { |
||||
this(createDefaultGroupInformation()); |
||||
} |
||||
|
||||
public ConsumerNetworkThreadTestBuilder(Optional<GroupInformation> groupInfo) { |
||||
super(groupInfo); |
||||
this.consumerNetworkThread = new ConsumerNetworkThread( |
||||
logContext, |
||||
time, |
||||
() -> applicationEventProcessor, |
||||
() -> networkClientDelegate, |
||||
() -> requestManagers |
||||
); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
closeQuietly(consumerNetworkThread, ConsumerNetworkThread.class.getSimpleName()); |
||||
} |
||||
} |
||||
|
||||
public static class ApplicationEventHandlerTestBuilder extends ConsumerTestBuilder { |
||||
|
||||
public final ApplicationEventHandler applicationEventHandler; |
||||
|
||||
public ApplicationEventHandlerTestBuilder() { |
||||
this(createDefaultGroupInformation()); |
||||
} |
||||
|
||||
public ApplicationEventHandlerTestBuilder(Optional<GroupInformation> groupInfo) { |
||||
super(groupInfo); |
||||
this.applicationEventHandler = spy(new ApplicationEventHandler( |
||||
logContext, |
||||
time, |
||||
applicationEventQueue, |
||||
() -> applicationEventProcessor, |
||||
() -> networkClientDelegate, |
||||
() -> requestManagers)); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
closeQuietly(applicationEventHandler, ApplicationEventHandler.class.getSimpleName()); |
||||
} |
||||
} |
||||
|
||||
public static class PrototypeAsyncConsumerTestBuilder extends ApplicationEventHandlerTestBuilder { |
||||
|
||||
final PrototypeAsyncConsumer<String, String> consumer; |
||||
|
||||
public PrototypeAsyncConsumerTestBuilder(Optional<GroupInformation> groupInfo) { |
||||
super(groupInfo); |
||||
String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); |
||||
List<ConsumerPartitionAssignor> assignors = ConsumerPartitionAssignor.getAssignorInstances( |
||||
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), |
||||
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) |
||||
); |
||||
Deserializers<String, String> deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); |
||||
FetchCollector<String, String> fetchCollector = new FetchCollector<>(logContext, |
||||
metadata, |
||||
subscriptions, |
||||
fetchConfig, |
||||
deserializers, |
||||
metricsManager, |
||||
time); |
||||
this.consumer = spy(new PrototypeAsyncConsumer<>( |
||||
logContext, |
||||
clientId, |
||||
deserializers, |
||||
new FetchBuffer(logContext), |
||||
fetchCollector, |
||||
new ConsumerInterceptors<>(Collections.emptyList()), |
||||
time, |
||||
applicationEventHandler, |
||||
backgroundEventQueue, |
||||
metrics, |
||||
subscriptions, |
||||
metadata, |
||||
retryBackoffMs, |
||||
60000, |
||||
assignors, |
||||
groupInfo.map(groupInformation -> groupInformation.groupState.groupId).orElse(null))); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
consumer.close(); |
||||
} |
||||
} |
||||
|
||||
public static class GroupInformation { |
||||
|
||||
final GroupState groupState; |
||||
final int heartbeatIntervalMs; |
||||
final double heartbeatJitterMs; |
||||
|
||||
public GroupInformation(GroupState groupState) { |
||||
this(groupState, DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_HEARTBEAT_JITTER_MS); |
||||
} |
||||
|
||||
public GroupInformation(GroupState groupState, int heartbeatIntervalMs, double heartbeatJitterMs) { |
||||
this.groupState = groupState; |
||||
this.heartbeatIntervalMs = heartbeatIntervalMs; |
||||
this.heartbeatJitterMs = heartbeatJitterMs; |
||||
} |
||||
} |
||||
|
||||
static Optional<GroupInformation> createDefaultGroupInformation() { |
||||
return Optional.of(new GroupInformation(new GroupState(DEFAULT_GROUP_ID, Optional.empty()))); |
||||
} |
||||
} |
@ -1,436 +0,0 @@
@@ -1,436 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.GroupRebalanceConfig; |
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.clients.consumer.LogTruncationException; |
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; |
||||
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; |
||||
import org.apache.kafka.common.TopicPartition; |
||||
import org.apache.kafka.common.errors.TopicAuthorizationException; |
||||
import org.apache.kafka.common.message.FindCoordinatorRequestData; |
||||
import org.apache.kafka.common.requests.FindCoordinatorRequest; |
||||
import org.apache.kafka.common.serialization.StringDeserializer; |
||||
import org.apache.kafka.common.utils.LogContext; |
||||
import org.apache.kafka.common.utils.MockTime; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.test.TestUtils; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.mockito.Mockito; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.Properties; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; |
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
import static org.junit.jupiter.api.Assertions.assertFalse; |
||||
import static org.junit.jupiter.api.Assertions.assertThrows; |
||||
import static org.junit.jupiter.api.Assertions.assertTrue; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.anyLong; |
||||
import static org.mockito.Mockito.doThrow; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.spy; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
@SuppressWarnings("ClassDataAbstractionCoupling") |
||||
public class DefaultBackgroundThreadTest { |
||||
private static final long RETRY_BACKOFF_MS = 100; |
||||
private final Properties properties = new Properties(); |
||||
private MockTime time; |
||||
private ConsumerMetadata metadata; |
||||
private NetworkClientDelegate networkClient; |
||||
private BlockingQueue<BackgroundEvent> backgroundEventsQueue; |
||||
private BlockingQueue<ApplicationEvent> applicationEventsQueue; |
||||
private ApplicationEventProcessor applicationEventProcessor; |
||||
private CoordinatorRequestManager coordinatorManager; |
||||
private OffsetsRequestManager offsetsRequestManager; |
||||
private ErrorEventHandler errorEventHandler; |
||||
private final int requestTimeoutMs = 500; |
||||
private GroupState groupState; |
||||
private CommitRequestManager commitManager; |
||||
private TopicMetadataRequestManager topicMetadataRequestManager; |
||||
private HeartbeatRequestManager heartbeatRequestManager; |
||||
|
||||
@BeforeEach |
||||
@SuppressWarnings("unchecked") |
||||
public void setup() { |
||||
this.time = new MockTime(0); |
||||
this.metadata = mock(ConsumerMetadata.class); |
||||
this.networkClient = mock(NetworkClientDelegate.class); |
||||
this.applicationEventsQueue = (BlockingQueue<ApplicationEvent>) mock(BlockingQueue.class); |
||||
this.backgroundEventsQueue = (BlockingQueue<BackgroundEvent>) mock(BlockingQueue.class); |
||||
this.applicationEventProcessor = mock(ApplicationEventProcessor.class); |
||||
this.coordinatorManager = mock(CoordinatorRequestManager.class); |
||||
this.offsetsRequestManager = mock(OffsetsRequestManager.class); |
||||
this.heartbeatRequestManager = mock(HeartbeatRequestManager.class); |
||||
this.errorEventHandler = mock(ErrorEventHandler.class); |
||||
GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig( |
||||
100, |
||||
100, |
||||
100, |
||||
"group_id", |
||||
Optional.empty(), |
||||
100, |
||||
1000, |
||||
true); |
||||
this.groupState = new GroupState(rebalanceConfig); |
||||
this.commitManager = mock(CommitRequestManager.class); |
||||
this.topicMetadataRequestManager = mock(TopicMetadataRequestManager.class); |
||||
} |
||||
|
||||
@Test |
||||
public void testStartupAndTearDown() throws InterruptedException { |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
backgroundThread.start(); |
||||
TestUtils.waitForCondition(backgroundThread::isRunning, "Failed awaiting for the background thread to be running"); |
||||
backgroundThread.close(); |
||||
assertFalse(backgroundThread.isRunning()); |
||||
} |
||||
|
||||
@Test |
||||
public void testApplicationEvent() { |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
ApplicationEvent e = new NoopApplicationEvent("noop event"); |
||||
this.applicationEventsQueue.add(e); |
||||
backgroundThread.runOnce(); |
||||
verify(applicationEventProcessor, times(1)).process(e); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testMetadataUpdateEvent() { |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.applicationEventProcessor = new ApplicationEventProcessor( |
||||
this.backgroundEventsQueue, |
||||
mockRequestManagers(), |
||||
metadata); |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); |
||||
this.applicationEventsQueue.add(e); |
||||
backgroundThread.runOnce(); |
||||
verify(metadata).requestUpdateForNewTopics(); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testCommitEvent() { |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
ApplicationEvent e = new CommitApplicationEvent(new HashMap<>()); |
||||
this.applicationEventsQueue.add(e); |
||||
backgroundThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(CommitApplicationEvent.class)); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
|
||||
@Test |
||||
public void testListOffsetsEventIsProcessed() { |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L); |
||||
ApplicationEvent e = new ListOffsetsApplicationEvent(timestamps, true); |
||||
this.applicationEventsQueue.add(e); |
||||
backgroundThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(ListOffsetsApplicationEvent.class)); |
||||
assertTrue(applicationEventsQueue.isEmpty()); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testResetPositionsEventIsProcessed() { |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
ResetPositionsApplicationEvent e = new ResetPositionsApplicationEvent(); |
||||
this.applicationEventsQueue.add(e); |
||||
backgroundThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); |
||||
assertTrue(applicationEventsQueue.isEmpty()); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testResetPositionsProcessFailure() { |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
applicationEventProcessor = spy(new ApplicationEventProcessor( |
||||
this.backgroundEventsQueue, |
||||
mockRequestManagers(), |
||||
metadata)); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
|
||||
TopicAuthorizationException authException = new TopicAuthorizationException("Topic authorization failed"); |
||||
doThrow(authException).when(offsetsRequestManager).resetPositionsIfNeeded(); |
||||
|
||||
ResetPositionsApplicationEvent event = new ResetPositionsApplicationEvent(); |
||||
this.applicationEventsQueue.add(event); |
||||
assertThrows(TopicAuthorizationException.class, backgroundThread::runOnce); |
||||
|
||||
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class)); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testValidatePositionsEventIsProcessed() { |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
ValidatePositionsApplicationEvent e = new ValidatePositionsApplicationEvent(); |
||||
this.applicationEventsQueue.add(e); |
||||
backgroundThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class)); |
||||
assertTrue(applicationEventsQueue.isEmpty()); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testValidatePositionsProcessFailure() { |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
applicationEventProcessor = spy(new ApplicationEventProcessor( |
||||
this.backgroundEventsQueue, |
||||
mockRequestManagers(), |
||||
metadata)); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
|
||||
LogTruncationException logTruncationException = new LogTruncationException(Collections.emptyMap(), Collections.emptyMap()); |
||||
doThrow(logTruncationException).when(offsetsRequestManager).validatePositionsIfNeeded(); |
||||
|
||||
ValidatePositionsApplicationEvent event = new ValidatePositionsApplicationEvent(); |
||||
this.applicationEventsQueue.add(event); |
||||
assertThrows(LogTruncationException.class, backgroundThread::runOnce); |
||||
|
||||
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class)); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testAssignmentChangeEvent() { |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.backgroundEventsQueue = new LinkedBlockingQueue<>(); |
||||
this.applicationEventProcessor = spy(new ApplicationEventProcessor( |
||||
this.backgroundEventsQueue, |
||||
mockRequestManagers(), |
||||
metadata)); |
||||
|
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
HashMap<TopicPartition, OffsetAndMetadata> offset = mockTopicPartitionOffset(); |
||||
|
||||
final long currentTimeMs = time.milliseconds(); |
||||
ApplicationEvent e = new AssignmentChangeApplicationEvent(offset, currentTimeMs); |
||||
this.applicationEventsQueue.add(e); |
||||
|
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
|
||||
backgroundThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class)); |
||||
verify(networkClient, times(1)).poll(anyLong(), anyLong()); |
||||
verify(commitManager, times(1)).updateAutoCommitTimer(currentTimeMs); |
||||
verify(commitManager, times(1)).maybeAutoCommit(offset); |
||||
|
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
void testFindCoordinator() { |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
backgroundThread.runOnce(); |
||||
Mockito.verify(coordinatorManager, times(1)).poll(anyLong()); |
||||
Mockito.verify(networkClient, times(1)).poll(anyLong(), anyLong()); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
void testFetchTopicMetadata() { |
||||
this.applicationEventsQueue = new LinkedBlockingQueue<>(); |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); |
||||
when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); |
||||
when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
when(heartbeatRequestManager.poll(anyLong())).thenReturn(emptyPollResults()); |
||||
this.applicationEventsQueue.add(new TopicMetadataApplicationEvent("topic")); |
||||
backgroundThread.runOnce(); |
||||
verify(applicationEventProcessor).process(any(TopicMetadataApplicationEvent.class)); |
||||
backgroundThread.close(); |
||||
} |
||||
|
||||
@Test |
||||
void testPollResultTimer() { |
||||
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); |
||||
// purposely setting a non MAX time to ensure it is returning Long.MAX_VALUE upon success
|
||||
NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult( |
||||
10, |
||||
Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs))); |
||||
assertEquals(10, backgroundThread.handlePollResult(success)); |
||||
|
||||
NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult( |
||||
10, |
||||
new ArrayList<>()); |
||||
assertEquals(10, backgroundThread.handlePollResult(failure)); |
||||
} |
||||
|
||||
private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() { |
||||
final TopicPartition t0 = new TopicPartition("t0", 2); |
||||
final TopicPartition t1 = new TopicPartition("t0", 3); |
||||
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>(); |
||||
topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L)); |
||||
topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L)); |
||||
return topicPartitionOffsets; |
||||
} |
||||
|
||||
private RequestManagers mockRequestManagers() { |
||||
return new RequestManagers( |
||||
offsetsRequestManager, |
||||
topicMetadataRequestManager, |
||||
Optional.of(coordinatorManager), |
||||
Optional.of(commitManager), |
||||
Optional.of(heartbeatRequestManager)); |
||||
} |
||||
|
||||
private static NetworkClientDelegate.UnsentRequest findCoordinatorUnsentRequest( |
||||
final Time time, |
||||
final long timeout |
||||
) { |
||||
NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest( |
||||
new FindCoordinatorRequest.Builder( |
||||
new FindCoordinatorRequestData() |
||||
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) |
||||
.setKey("foobar")), |
||||
Optional.empty()); |
||||
req.setTimer(time, timeout); |
||||
return req; |
||||
} |
||||
|
||||
private DefaultBackgroundThread mockBackgroundThread() { |
||||
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS); |
||||
|
||||
return new DefaultBackgroundThread( |
||||
this.time, |
||||
new ConsumerConfig(properties), |
||||
new LogContext(), |
||||
applicationEventsQueue, |
||||
backgroundEventsQueue, |
||||
this.errorEventHandler, |
||||
applicationEventProcessor, |
||||
this.metadata, |
||||
this.networkClient, |
||||
this.groupState, |
||||
this.coordinatorManager, |
||||
this.commitManager, |
||||
this.offsetsRequestManager, |
||||
this.topicMetadataRequestManager, |
||||
this.heartbeatRequestManager); |
||||
} |
||||
|
||||
private NetworkClientDelegate.PollResult mockPollCoordinatorResult() { |
||||
return new NetworkClientDelegate.PollResult( |
||||
RETRY_BACKOFF_MS, |
||||
Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs))); |
||||
} |
||||
|
||||
private NetworkClientDelegate.PollResult mockPollCommitResult() { |
||||
return new NetworkClientDelegate.PollResult( |
||||
RETRY_BACKOFF_MS, |
||||
Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs))); |
||||
} |
||||
|
||||
private NetworkClientDelegate.PollResult emptyPollResults() { |
||||
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList()); |
||||
} |
||||
} |
@ -1,81 +0,0 @@
@@ -1,81 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals; |
||||
|
||||
import org.apache.kafka.clients.GroupRebalanceConfig; |
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; |
||||
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent; |
||||
import org.apache.kafka.common.serialization.StringDeserializer; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
import java.util.Optional; |
||||
import java.util.Properties; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; |
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; |
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
import static org.junit.jupiter.api.Assertions.assertFalse; |
||||
import static org.junit.jupiter.api.Assertions.assertTrue; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
|
||||
public class DefaultEventHandlerTest { |
||||
private int sessionTimeoutMs = 1000; |
||||
private int rebalanceTimeoutMs = 1000; |
||||
private int heartbeatIntervalMs = 1000; |
||||
private String groupId = "g-1"; |
||||
private Optional<String> groupInstanceId = Optional.of("g-1"); |
||||
private long retryBackoffMs = 1000; |
||||
private final Properties properties = new Properties(); |
||||
private GroupRebalanceConfig rebalanceConfig; |
||||
|
||||
@BeforeEach |
||||
public void setup() { |
||||
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
properties.put(RETRY_BACKOFF_MS_CONFIG, "100"); |
||||
|
||||
this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, |
||||
rebalanceTimeoutMs, |
||||
heartbeatIntervalMs, |
||||
groupId, |
||||
groupInstanceId, |
||||
retryBackoffMs, |
||||
retryBackoffMs, |
||||
true); |
||||
} |
||||
|
||||
@Test |
||||
public void testBasicHandlerOps() { |
||||
final DefaultBackgroundThread bt = mock(DefaultBackgroundThread.class); |
||||
final BlockingQueue<ApplicationEvent> aq = new LinkedBlockingQueue<>(); |
||||
final BlockingQueue<BackgroundEvent> bq = new LinkedBlockingQueue<>(); |
||||
final DefaultEventHandler handler = new DefaultEventHandler(bt, aq, bq); |
||||
assertTrue(handler.isEmpty()); |
||||
assertFalse(handler.poll().isPresent()); |
||||
handler.add(new NoopApplicationEvent("test")); |
||||
assertEquals(1, aq.size()); |
||||
handler.close(); |
||||
verify(bt, times(1)).close(); |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,141 @@
@@ -0,0 +1,141 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.clients.consumer.internals.events; |
||||
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder; |
||||
import org.apache.kafka.common.KafkaException; |
||||
import org.junit.jupiter.api.AfterEach; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
import java.util.concurrent.BlockingQueue; |
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals; |
||||
import static org.junit.jupiter.api.Assertions.assertFalse; |
||||
import static org.junit.jupiter.api.Assertions.assertNotNull; |
||||
import static org.junit.jupiter.api.Assertions.assertTrue; |
||||
import static org.junit.jupiter.api.Assertions.fail; |
||||
|
||||
public class BackgroundEventHandlerTest { |
||||
|
||||
private ConsumerTestBuilder testBuilder; |
||||
private BlockingQueue<BackgroundEvent> backgroundEventQueue; |
||||
private BackgroundEventHandler backgroundEventHandler; |
||||
private BackgroundEventProcessor backgroundEventProcessor; |
||||
|
||||
@BeforeEach |
||||
public void setup() { |
||||
testBuilder = new ConsumerTestBuilder(); |
||||
backgroundEventQueue = testBuilder.backgroundEventQueue; |
||||
backgroundEventHandler = testBuilder.backgroundEventHandler; |
||||
backgroundEventProcessor = testBuilder.backgroundEventProcessor; |
||||
} |
||||
|
||||
@AfterEach |
||||
public void tearDown() { |
||||
if (testBuilder != null) |
||||
testBuilder.close(); |
||||
} |
||||
|
||||
@Test |
||||
public void testNoEvents() { |
||||
assertTrue(backgroundEventQueue.isEmpty()); |
||||
backgroundEventProcessor.process((event, error) -> { }); |
||||
assertTrue(backgroundEventQueue.isEmpty()); |
||||
} |
||||
|
||||
@Test |
||||
public void testSingleEvent() { |
||||
BackgroundEvent event = new ErrorBackgroundEvent(new RuntimeException("A")); |
||||
backgroundEventQueue.add(event); |
||||
assertPeeked(event); |
||||
backgroundEventProcessor.process((e, error) -> { }); |
||||
assertTrue(backgroundEventQueue.isEmpty()); |
||||
} |
||||
|
||||
@Test |
||||
public void testSingleErrorEvent() { |
||||
KafkaException error = new KafkaException("error"); |
||||
BackgroundEvent event = new ErrorBackgroundEvent(error); |
||||
backgroundEventHandler.add(new ErrorBackgroundEvent(error)); |
||||
assertPeeked(event); |
||||
assertProcessThrows(error); |
||||
} |
||||
|
||||
@Test |
||||
public void testMultipleEvents() { |
||||
BackgroundEvent event1 = new ErrorBackgroundEvent(new RuntimeException("A")); |
||||
backgroundEventQueue.add(event1); |
||||
backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); |
||||
backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); |
||||
|
||||
assertPeeked(event1); |
||||
backgroundEventProcessor.process((event, error) -> { }); |
||||
assertTrue(backgroundEventQueue.isEmpty()); |
||||
} |
||||
|
||||
@Test |
||||
public void testMultipleErrorEvents() { |
||||
Throwable error1 = new Throwable("error1"); |
||||
KafkaException error2 = new KafkaException("error2"); |
||||
KafkaException error3 = new KafkaException("error3"); |
||||
|
||||
backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); |
||||
backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); |
||||
backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); |
||||
|
||||
assertProcessThrows(new KafkaException(error1)); |
||||
} |
||||
|
||||
@Test |
||||
public void testMixedEventsWithErrorEvents() { |
||||
Throwable error1 = new Throwable("error1"); |
||||
KafkaException error2 = new KafkaException("error2"); |
||||
KafkaException error3 = new KafkaException("error3"); |
||||
|
||||
RuntimeException errorToCheck = new RuntimeException("A"); |
||||
backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck)); |
||||
backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); |
||||
backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); |
||||
backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); |
||||
backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); |
||||
backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); |
||||
backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("D"))); |
||||
|
||||
assertProcessThrows(new KafkaException(errorToCheck)); |
||||
} |
||||
|
||||
private void assertPeeked(BackgroundEvent event) { |
||||
BackgroundEvent peekEvent = backgroundEventQueue.peek(); |
||||
assertNotNull(peekEvent); |
||||
assertEquals(event, peekEvent); |
||||
} |
||||
|
||||
private void assertProcessThrows(Throwable error) { |
||||
assertFalse(backgroundEventQueue.isEmpty()); |
||||
|
||||
try { |
||||
backgroundEventProcessor.process(); |
||||
fail("Should have thrown error: " + error); |
||||
} catch (Throwable t) { |
||||
assertEquals(error.getClass(), t.getClass()); |
||||
assertEquals(error.getMessage(), t.getMessage()); |
||||
} |
||||
|
||||
assertTrue(backgroundEventQueue.isEmpty()); |
||||
} |
||||
} |
Loading…
Reference in new issue