Browse Source

KAFKA-5324; AdminClient: add close with timeout, fix some timeout bugs

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3141 from cmccabe/KAFKA-5324
pull/3141/merge
Colin P. Mccabe 8 years ago committed by Ismael Juma
parent
commit
3250cc767e
  1. 29
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
  2. 240
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  3. 2
      clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
  4. 59
      core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala

29
clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java

@ -23,6 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; @@ -23,6 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics,
@ -36,8 +37,8 @@ public abstract class AdminClient implements AutoCloseable { @@ -36,8 +37,8 @@ public abstract class AdminClient implements AutoCloseable {
/**
* Create a new AdminClient with the given configuration.
*
* @param props The configuration.
* @return The new KafkaAdminClient.
* @param props The configuration.
* @return The new KafkaAdminClient.
*/
public static AdminClient create(Properties props) {
return KafkaAdminClient.createInternal(new AdminClientConfig(props));
@ -46,8 +47,8 @@ public abstract class AdminClient implements AutoCloseable { @@ -46,8 +47,8 @@ public abstract class AdminClient implements AutoCloseable {
/**
* Create a new AdminClient with the given configuration.
*
* @param conf The configuration.
* @return The new KafkaAdminClient.
* @param conf The configuration.
* @return The new KafkaAdminClient.
*/
public static AdminClient create(Map<String, Object> conf) {
return KafkaAdminClient.createInternal(new AdminClientConfig(conf));
@ -55,8 +56,26 @@ public abstract class AdminClient implements AutoCloseable { @@ -55,8 +56,26 @@ public abstract class AdminClient implements AutoCloseable {
/**
* Close the AdminClient and release all associated resources.
*
* See {@link AdminClient#close(long, TimeUnit)}
*/
@Override
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
/**
* Close the AdminClient and release all associated resources.
*
* The close operation has a grace period during which current operations will be allowed to
* complete, specified by the given duration and time unit.
* New operations will not be accepted during the grace period. Once the grace period is over,
* all operations that have not yet been completed will be aborted with a TimeoutException.
*
* @param duration The duration to use for the wait time.
* @param unit The time unit to use for the wait time.
*/
public abstract void close();
public abstract void close(long duration, TimeUnit unit);
/**
* Create a batch of new topics with the default options.

240
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -97,6 +97,7 @@ import java.util.Map; @@ -97,6 +97,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
@ -123,6 +124,11 @@ public class KafkaAdminClient extends AdminClient { @@ -123,6 +124,11 @@ public class KafkaAdminClient extends AdminClient {
*/
private static final String JMX_PREFIX = "kafka.admin.client";
/**
* An invalid shutdown time which indicates that a shutdown has not yet been performed.
*/
private static final long INVALID_SHUTDOWN_TIME = -1;
/**
* The default timeout to use for an operation.
*/
@ -164,9 +170,10 @@ public class KafkaAdminClient extends AdminClient { @@ -164,9 +170,10 @@ public class KafkaAdminClient extends AdminClient {
private final Thread thread;
/**
* True if this client is closed.
* During a close operation, this is the time at which we will time out all pending operations
* and force the RPC thread to exit. If the admin client is not closing, this will be 0.
*/
private volatile boolean closed = false;
private final AtomicLong hardShutdownTimeMs = new AtomicLong(INVALID_SHUTDOWN_TIME);
/**
* Get or create a list value from a map.
@ -289,12 +296,12 @@ public class KafkaAdminClient extends AdminClient { @@ -289,12 +296,12 @@ public class KafkaAdminClient extends AdminClient {
selector,
metadata,
clientId,
100,
1,
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
(int) TimeUnit.HOURS.toMillis(1),
time,
true,
apiVersions);
@ -309,7 +316,7 @@ public class KafkaAdminClient extends AdminClient { @@ -309,7 +316,7 @@ public class KafkaAdminClient extends AdminClient {
}
}
static KafkaAdminClient create(AdminClientConfig config, KafkaClient client, Metadata metadata) {
static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata) {
Metrics metrics = null;
Time time = Time.SYSTEM;
String clientId = generateClientId(config);
@ -343,9 +350,33 @@ public class KafkaAdminClient extends AdminClient { @@ -343,9 +350,33 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
public void close() {
closed = true;
client.wakeup(); // Wake the thread, if it is blocked inside poll().
public void close(long duration, TimeUnit unit) {
long waitTimeMs = unit.toMillis(duration);
waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365), waitTimeMs); // Limit the timeout to a year.
long now = time.milliseconds();
long newHardShutdownTimeMs = now + waitTimeMs;
long prev = INVALID_SHUTDOWN_TIME;
while (true) {
if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) {
if (prev == INVALID_SHUTDOWN_TIME) {
log.debug("{}: initiating close operation.", clientId);
} else {
log.debug("{}: moving hard shutdown time forward.", clientId);
}
client.wakeup(); // Wake the thread, if it is blocked inside poll().
break;
}
prev = hardShutdownTimeMs.get();
if (prev < newHardShutdownTimeMs) {
log.debug("{}: hard shutdown time is already earlier than requested.", clientId);
newHardShutdownTimeMs = prev;
break;
}
}
if (log.isDebugEnabled()) {
long deltaMs = Math.max(0, newHardShutdownTimeMs - time.milliseconds());
log.debug("{}: waiting for the I/O thread to exit. Hard shutdown in {} ms.", clientId, deltaMs);
}
try {
// Wait for the thread to be joined.
thread.join();
@ -439,7 +470,7 @@ public class KafkaAdminClient extends AdminClient { @@ -439,7 +470,7 @@ public class KafkaAdminClient extends AdminClient {
if ((throwable instanceof UnsupportedVersionException) &&
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
log.trace("{} attempting protocol downgrade.", this);
runnable.call(this, now);
runnable.enqueue(this, now);
return;
}
tries++;
@ -474,7 +505,7 @@ public class KafkaAdminClient extends AdminClient { @@ -474,7 +505,7 @@ public class KafkaAdminClient extends AdminClient {
log.debug("{} failed: {}. Beginning retry #{}",
this, prettyPrintException(throwable), tries);
}
runnable.call(this, now);
runnable.enqueue(this, now);
}
/**
@ -523,6 +554,7 @@ public class KafkaAdminClient extends AdminClient { @@ -523,6 +554,7 @@ public class KafkaAdminClient extends AdminClient {
private final class AdminClientRunnable implements Runnable {
/**
* Pending calls. Protected by the object monitor.
* This will be null only if the thread has shut down.
*/
private List<Call> newCalls = new LinkedList<>();
@ -554,47 +586,96 @@ public class KafkaAdminClient extends AdminClient { @@ -554,47 +586,96 @@ public class KafkaAdminClient extends AdminClient {
return null;
}
/**
* Time out a list of calls.
*
* @param now The current time in milliseconds.
* @param calls The collection of calls. Must be sorted from oldest to newest.
*/
private int timeoutCalls(long now, Collection<Call> calls) {
int numTimedOut = 0;
for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
Call call = iter.next();
if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
call.fail(now, new TimeoutException());
iter.remove();
numTimedOut++;
private class TimeoutProcessor {
/**
* The current time in milliseconds.
*/
private final long now;
/**
* The number of milliseconds until the next timeout.
*/
private int nextTimeoutMs;
/**
* Create a new timeout processor.
*
* @param now The current time in milliseconds since the epoch.
*/
TimeoutProcessor(long now) {
this.now = now;
this.nextTimeoutMs = Integer.MAX_VALUE;
}
/**
* Check for calls which have timed out.
* Timed out calls will be removed and failed.
* The remaining milliseconds until the next timeout will be updated.
*
* @param calls The collection of calls.
*
* @return The number of calls which were timed out.
*/
int handleTimeouts(Collection<Call> calls, String msg) {
int numTimedOut = 0;
for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
Call call = iter.next();
int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
if (remainingMs < 0) {
call.fail(now, new TimeoutException(msg));
iter.remove();
numTimedOut++;
} else {
nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs);
}
}
return numTimedOut;
}
/**
* Check whether a call should be timed out.
* The remaining milliseconds until the next timeout will be updated.
*
* @param call The call.
*
* @return True if the call should be timed out.
*/
boolean callHasExpired(Call call) {
int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
if (remainingMs < 0)
return true;
nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs);
return false;
}
int nextTimeoutMs() {
return nextTimeoutMs;
}
return numTimedOut;
}
/**
* Time out the elements in the newCalls list which are expired.
*
* @param now The current time in milliseconds.
* @param processor The timeout processor.
*/
private synchronized void timeoutNewCalls(long now) {
int numTimedOut = timeoutCalls(now, newCalls);
if (numTimedOut > 0) {
private synchronized void timeoutNewCalls(TimeoutProcessor processor) {
int numTimedOut = processor.handleTimeouts(newCalls,
"Timed out waiting for a node assignment.");
if (numTimedOut > 0)
log.debug("{}: timed out {} new calls.", clientId, numTimedOut);
}
}
/**
* Time out calls which have been assigned to nodes.
*
* @param now The current time in milliseconds.
* @param processor The timeout processor.
* @param callsToSend A map of nodes to the calls they need to handle.
*/
private void timeoutCallsToSend(long now, Map<Node, List<Call>> callsToSend) {
private void timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) {
int numTimedOut = 0;
for (List<Call> callList : callsToSend.values()) {
numTimedOut += timeoutCalls(now, callList);
numTimedOut += processor.handleTimeouts(callList,
"Timed out waiting to send the call.");
}
if (numTimedOut > 0)
log.debug("{}: timed out {} call(s) with assigned nodes.", clientId, numTimedOut);
@ -698,10 +779,10 @@ public class KafkaAdminClient extends AdminClient { @@ -698,10 +779,10 @@ public class KafkaAdminClient extends AdminClient {
* even be in the process of being processed by the remote server. At the moment, our only option
* to time them out is to close the entire connection.
*
* @param now The current time in milliseconds.
* @param callsInFlight A map of nodes to the calls they have in flight.
* @param processor The timeout processor.
* @param callsInFlight A map of nodes to the calls they have in flight.
*/
private void timeoutCallsInFlight(long now, Map<String, List<Call>> callsInFlight) {
private void timeoutCallsInFlight(TimeoutProcessor processor, Map<String, List<Call>> callsInFlight) {
int numTimedOut = 0;
for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
List<Call> contexts = entry.getValue();
@ -711,7 +792,7 @@ public class KafkaAdminClient extends AdminClient { @@ -711,7 +792,7 @@ public class KafkaAdminClient extends AdminClient {
// We assume that the first element in the list is the earliest. So it should be the
// only one we need to check the timeout for.
Call call = contexts.get(0);
if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
if (processor.callHasExpired(call)) {
log.debug("{}: Closing connection to {} to time out {}", clientId, nodeId, call);
client.close(nodeId);
numTimedOut++;
@ -773,6 +854,22 @@ public class KafkaAdminClient extends AdminClient { @@ -773,6 +854,22 @@ public class KafkaAdminClient extends AdminClient {
}
}
private synchronized boolean threadShouldExit(long now, long curHardShutdownTimeMs,
Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
if (newCalls.isEmpty() && callsToSend.isEmpty() && correlationIdToCalls.isEmpty()) {
log.trace("{}: all work has been completed, and the I/O thread is now " +
"exiting.", clientId);
return true;
}
if (now > curHardShutdownTimeMs) {
log.info("{}: forcing a hard I/O thread shutdown. Requests in progress will " +
"be aborted.", clientId);
return true;
}
log.debug("{}: hard shutdown in {} ms.", clientId, curHardShutdownTimeMs - now);
return false;
}
@Override
public void run() {
/**
@ -798,18 +895,25 @@ public class KafkaAdminClient extends AdminClient { @@ -798,18 +895,25 @@ public class KafkaAdminClient extends AdminClient {
long now = time.milliseconds();
log.trace("{} thread starting", clientId);
while (true) {
// Check if the AdminClient is shutting down.
if (closed)
// Check if the AdminClient thread should shut down.
long curHardShutdownTimeMs = hardShutdownTimeMs.get();
if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
threadShouldExit(now, curHardShutdownTimeMs, callsToSend, correlationIdToCalls))
break;
// Handle timeouts.
timeoutNewCalls(now);
timeoutCallsToSend(now, callsToSend);
timeoutCallsInFlight(now, callsInFlight);
TimeoutProcessor timeoutProcessor = new TimeoutProcessor(now);
timeoutNewCalls(timeoutProcessor);
timeoutCallsToSend(timeoutProcessor, callsToSend);
timeoutCallsInFlight(timeoutProcessor, callsInFlight);
long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now);
}
// Handle new calls and metadata update requests.
prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
long pollTimeout = 1200000;
if (prevMetadataVersion == null) {
chooseNodesForNewCalls(now, callsToSend);
pollTimeout = Math.min(pollTimeout,
@ -826,10 +930,14 @@ public class KafkaAdminClient extends AdminClient { @@ -826,10 +930,14 @@ public class KafkaAdminClient extends AdminClient {
handleResponses(now, responses, callsInFlight, correlationIdToCalls);
}
int numTimedOut = 0;
TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
synchronized (this) {
numTimedOut += timeoutCalls(Long.MAX_VALUE, newCalls);
numTimedOut += timeoutProcessor.handleTimeouts(newCalls,
"The AdminClient thread has exited.");
newCalls = null;
}
numTimedOut += timeoutCalls(Long.MAX_VALUE, correlationIdToCalls.values());
numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
"The AdminClient thread has exited.");
if (numTimedOut > 0) {
log.debug("{}: timed out {} remaining operations.", clientId, numTimedOut);
}
@ -838,15 +946,51 @@ public class KafkaAdminClient extends AdminClient { @@ -838,15 +946,51 @@ public class KafkaAdminClient extends AdminClient {
log.debug("{}: exiting AdminClientRunnable thread.", clientId);
}
void call(Call call, long now) {
/**
* Queue a call for sending.
*
* If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even
* if the AdminClient is shutting down.) This function should called when retrying an
* existing call.
*
* @param call The new call object.
* @param now The current time in milliseconds.
*/
void enqueue(Call call, long now) {
if (log.isDebugEnabled()) {
log.debug("{}: queueing {} with a timeout {} ms from now.",
clientId, call, call.deadlineMs - now);
}
boolean accepted = false;
synchronized (this) {
newCalls.add(call);
if (newCalls != null) {
newCalls.add(call);
accepted = true;
}
}
if (accepted) {
client.wakeup(); // wake the thread if it is in poll()
} else {
log.debug("{}: the AdminClient thread has exited. Timing out {}.", clientId, call);
call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited."));
}
}
/**
* Initiate a new call.
*
* This will fail if the AdminClient is scheduled to shut down.
*
* @param call The new call object.
* @param now The current time in milliseconds.
*/
void call(Call call, long now) {
if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
log.debug("{}: the AdminClient is not accepting new calls. Timing out {}.", clientId, call);
call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls."));
} else {
enqueue(call, now);
}
client.wakeup();
}
}

2
clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java

@ -52,7 +52,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable { @@ -52,7 +52,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable {
this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata);
this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata);
}
public Cluster cluster() {

59
core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala

@ -18,9 +18,9 @@ package kafka.api @@ -18,9 +18,9 @@ package kafka.api
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.ExecutionException
import java.util.concurrent.{ExecutionException, TimeUnit}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin._ @@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException}
import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException}
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.MetadataResponse
@ -381,6 +381,59 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin @@ -381,6 +381,59 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
client.close()
}
/**
* Test closing the AdminClient with a generous timeout. Calls in progress should be completed,
* since they can be done within the timeout. New calls should receive timeouts.
*/
@Test
def testDelayedClose(): Unit = {
client = AdminClient.create(createConfig())
val topics = Seq("mytopic", "mytopic2")
val newTopics = topics.map(new NewTopic(_, 1, 1))
val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
client.close(2, TimeUnit.HOURS)
val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
future.get
client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect
}
/**
* Test closing the AdminClient with a timeout of 0, when there are calls with extremely long
* timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses.
*/
@Test
def testForceClose(): Unit = {
val config = createConfig()
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
client = AdminClient.create(config)
// Because the bootstrap servers are set up incorrectly, this call will not complete, but must be
// cancelled by the close operation.
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
new CreateTopicsOptions().timeoutMs(900000)).all()
client.close(0, TimeUnit.MILLISECONDS)
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
}
/**
* Check that a call with a timeout does not complete before the minimum timeout has elapsed,
* even when the default request timeout is shorter.
*/
@Test
def testMinimumRequestTimeouts(): Unit = {
val config = createConfig()
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
client = AdminClient.create(config)
val startTimeMs = Time.SYSTEM.milliseconds()
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
new CreateTopicsOptions().timeoutMs(2)).all()
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
val endTimeMs = Time.SYSTEM.milliseconds()
assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
client.close()
}
override def generateConfigs() = {
val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)

Loading…
Cancel
Save