From 3250cc767e8ef79d0160312a8b605535d3002851 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Wed, 31 May 2017 03:02:26 +0100 Subject: [PATCH] KAFKA-5324; AdminClient: add close with timeout, fix some timeout bugs Author: Colin P. Mccabe Reviewers: Ismael Juma Closes #3141 from cmccabe/KAFKA-5324 --- .../kafka/clients/admin/AdminClient.java | 29 ++- .../kafka/clients/admin/KafkaAdminClient.java | 240 ++++++++++++++---- .../admin/MockKafkaAdminClientEnv.java | 2 +- .../api/KafkaAdminClientIntegrationTest.scala | 59 ++++- 4 files changed, 273 insertions(+), 57 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 4cfc1745f6e..96b8ebbbdd5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collection; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; /** * The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics, @@ -36,8 +37,8 @@ public abstract class AdminClient implements AutoCloseable { /** * Create a new AdminClient with the given configuration. * - * @param props The configuration. - * @return The new KafkaAdminClient. + * @param props The configuration. + * @return The new KafkaAdminClient. */ public static AdminClient create(Properties props) { return KafkaAdminClient.createInternal(new AdminClientConfig(props)); @@ -46,8 +47,8 @@ public abstract class AdminClient implements AutoCloseable { /** * Create a new AdminClient with the given configuration. * - * @param conf The configuration. - * @return The new KafkaAdminClient. + * @param conf The configuration. + * @return The new KafkaAdminClient. */ public static AdminClient create(Map conf) { return KafkaAdminClient.createInternal(new AdminClientConfig(conf)); @@ -55,8 +56,26 @@ public abstract class AdminClient implements AutoCloseable { /** * Close the AdminClient and release all associated resources. + * + * See {@link AdminClient#close(long, TimeUnit)} + */ + @Override + public void close() { + close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } + + /** + * Close the AdminClient and release all associated resources. + * + * The close operation has a grace period during which current operations will be allowed to + * complete, specified by the given duration and time unit. + * New operations will not be accepted during the grace period. Once the grace period is over, + * all operations that have not yet been completed will be aborted with a TimeoutException. + * + * @param duration The duration to use for the wait time. + * @param unit The time unit to use for the wait time. */ - public abstract void close(); + public abstract void close(long duration, TimeUnit unit); /** * Create a batch of new topics with the default options. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 355dc9c8aea..98fc3f3b3fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -97,6 +97,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -123,6 +124,11 @@ public class KafkaAdminClient extends AdminClient { */ private static final String JMX_PREFIX = "kafka.admin.client"; + /** + * An invalid shutdown time which indicates that a shutdown has not yet been performed. + */ + private static final long INVALID_SHUTDOWN_TIME = -1; + /** * The default timeout to use for an operation. */ @@ -164,9 +170,10 @@ public class KafkaAdminClient extends AdminClient { private final Thread thread; /** - * True if this client is closed. + * During a close operation, this is the time at which we will time out all pending operations + * and force the RPC thread to exit. If the admin client is not closing, this will be 0. */ - private volatile boolean closed = false; + private final AtomicLong hardShutdownTimeMs = new AtomicLong(INVALID_SHUTDOWN_TIME); /** * Get or create a list value from a map. @@ -289,12 +296,12 @@ public class KafkaAdminClient extends AdminClient { selector, metadata, clientId, - 100, + 1, config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG), config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG), - config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), + (int) TimeUnit.HOURS.toMillis(1), time, true, apiVersions); @@ -309,7 +316,7 @@ public class KafkaAdminClient extends AdminClient { } } - static KafkaAdminClient create(AdminClientConfig config, KafkaClient client, Metadata metadata) { + static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata) { Metrics metrics = null; Time time = Time.SYSTEM; String clientId = generateClientId(config); @@ -343,9 +350,33 @@ public class KafkaAdminClient extends AdminClient { } @Override - public void close() { - closed = true; - client.wakeup(); // Wake the thread, if it is blocked inside poll(). + public void close(long duration, TimeUnit unit) { + long waitTimeMs = unit.toMillis(duration); + waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365), waitTimeMs); // Limit the timeout to a year. + long now = time.milliseconds(); + long newHardShutdownTimeMs = now + waitTimeMs; + long prev = INVALID_SHUTDOWN_TIME; + while (true) { + if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) { + if (prev == INVALID_SHUTDOWN_TIME) { + log.debug("{}: initiating close operation.", clientId); + } else { + log.debug("{}: moving hard shutdown time forward.", clientId); + } + client.wakeup(); // Wake the thread, if it is blocked inside poll(). + break; + } + prev = hardShutdownTimeMs.get(); + if (prev < newHardShutdownTimeMs) { + log.debug("{}: hard shutdown time is already earlier than requested.", clientId); + newHardShutdownTimeMs = prev; + break; + } + } + if (log.isDebugEnabled()) { + long deltaMs = Math.max(0, newHardShutdownTimeMs - time.milliseconds()); + log.debug("{}: waiting for the I/O thread to exit. Hard shutdown in {} ms.", clientId, deltaMs); + } try { // Wait for the thread to be joined. thread.join(); @@ -439,7 +470,7 @@ public class KafkaAdminClient extends AdminClient { if ((throwable instanceof UnsupportedVersionException) && handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { log.trace("{} attempting protocol downgrade.", this); - runnable.call(this, now); + runnable.enqueue(this, now); return; } tries++; @@ -474,7 +505,7 @@ public class KafkaAdminClient extends AdminClient { log.debug("{} failed: {}. Beginning retry #{}", this, prettyPrintException(throwable), tries); } - runnable.call(this, now); + runnable.enqueue(this, now); } /** @@ -523,6 +554,7 @@ public class KafkaAdminClient extends AdminClient { private final class AdminClientRunnable implements Runnable { /** * Pending calls. Protected by the object monitor. + * This will be null only if the thread has shut down. */ private List newCalls = new LinkedList<>(); @@ -554,47 +586,96 @@ public class KafkaAdminClient extends AdminClient { return null; } - /** - * Time out a list of calls. - * - * @param now The current time in milliseconds. - * @param calls The collection of calls. Must be sorted from oldest to newest. - */ - private int timeoutCalls(long now, Collection calls) { - int numTimedOut = 0; - for (Iterator iter = calls.iterator(); iter.hasNext(); ) { - Call call = iter.next(); - if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) { - call.fail(now, new TimeoutException()); - iter.remove(); - numTimedOut++; + private class TimeoutProcessor { + /** + * The current time in milliseconds. + */ + private final long now; + + /** + * The number of milliseconds until the next timeout. + */ + private int nextTimeoutMs; + + /** + * Create a new timeout processor. + * + * @param now The current time in milliseconds since the epoch. + */ + TimeoutProcessor(long now) { + this.now = now; + this.nextTimeoutMs = Integer.MAX_VALUE; + } + + /** + * Check for calls which have timed out. + * Timed out calls will be removed and failed. + * The remaining milliseconds until the next timeout will be updated. + * + * @param calls The collection of calls. + * + * @return The number of calls which were timed out. + */ + int handleTimeouts(Collection calls, String msg) { + int numTimedOut = 0; + for (Iterator iter = calls.iterator(); iter.hasNext(); ) { + Call call = iter.next(); + int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs); + if (remainingMs < 0) { + call.fail(now, new TimeoutException(msg)); + iter.remove(); + numTimedOut++; + } else { + nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs); + } } + return numTimedOut; + } + + /** + * Check whether a call should be timed out. + * The remaining milliseconds until the next timeout will be updated. + * + * @param call The call. + * + * @return True if the call should be timed out. + */ + boolean callHasExpired(Call call) { + int remainingMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs); + if (remainingMs < 0) + return true; + nextTimeoutMs = Math.min(nextTimeoutMs, remainingMs); + return false; + } + + int nextTimeoutMs() { + return nextTimeoutMs; } - return numTimedOut; } /** * Time out the elements in the newCalls list which are expired. * - * @param now The current time in milliseconds. + * @param processor The timeout processor. */ - private synchronized void timeoutNewCalls(long now) { - int numTimedOut = timeoutCalls(now, newCalls); - if (numTimedOut > 0) { + private synchronized void timeoutNewCalls(TimeoutProcessor processor) { + int numTimedOut = processor.handleTimeouts(newCalls, + "Timed out waiting for a node assignment."); + if (numTimedOut > 0) log.debug("{}: timed out {} new calls.", clientId, numTimedOut); - } } /** * Time out calls which have been assigned to nodes. * - * @param now The current time in milliseconds. + * @param processor The timeout processor. * @param callsToSend A map of nodes to the calls they need to handle. */ - private void timeoutCallsToSend(long now, Map> callsToSend) { + private void timeoutCallsToSend(TimeoutProcessor processor, Map> callsToSend) { int numTimedOut = 0; for (List callList : callsToSend.values()) { - numTimedOut += timeoutCalls(now, callList); + numTimedOut += processor.handleTimeouts(callList, + "Timed out waiting to send the call."); } if (numTimedOut > 0) log.debug("{}: timed out {} call(s) with assigned nodes.", clientId, numTimedOut); @@ -698,10 +779,10 @@ public class KafkaAdminClient extends AdminClient { * even be in the process of being processed by the remote server. At the moment, our only option * to time them out is to close the entire connection. * - * @param now The current time in milliseconds. - * @param callsInFlight A map of nodes to the calls they have in flight. + * @param processor The timeout processor. + * @param callsInFlight A map of nodes to the calls they have in flight. */ - private void timeoutCallsInFlight(long now, Map> callsInFlight) { + private void timeoutCallsInFlight(TimeoutProcessor processor, Map> callsInFlight) { int numTimedOut = 0; for (Map.Entry> entry : callsInFlight.entrySet()) { List contexts = entry.getValue(); @@ -711,7 +792,7 @@ public class KafkaAdminClient extends AdminClient { // We assume that the first element in the list is the earliest. So it should be the // only one we need to check the timeout for. Call call = contexts.get(0); - if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) { + if (processor.callHasExpired(call)) { log.debug("{}: Closing connection to {} to time out {}", clientId, nodeId, call); client.close(nodeId); numTimedOut++; @@ -773,6 +854,22 @@ public class KafkaAdminClient extends AdminClient { } } + private synchronized boolean threadShouldExit(long now, long curHardShutdownTimeMs, + Map> callsToSend, Map correlationIdToCalls) { + if (newCalls.isEmpty() && callsToSend.isEmpty() && correlationIdToCalls.isEmpty()) { + log.trace("{}: all work has been completed, and the I/O thread is now " + + "exiting.", clientId); + return true; + } + if (now > curHardShutdownTimeMs) { + log.info("{}: forcing a hard I/O thread shutdown. Requests in progress will " + + "be aborted.", clientId); + return true; + } + log.debug("{}: hard shutdown in {} ms.", clientId, curHardShutdownTimeMs - now); + return false; + } + @Override public void run() { /** @@ -798,18 +895,25 @@ public class KafkaAdminClient extends AdminClient { long now = time.milliseconds(); log.trace("{} thread starting", clientId); while (true) { - // Check if the AdminClient is shutting down. - if (closed) + // Check if the AdminClient thread should shut down. + long curHardShutdownTimeMs = hardShutdownTimeMs.get(); + if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && + threadShouldExit(now, curHardShutdownTimeMs, callsToSend, correlationIdToCalls)) break; // Handle timeouts. - timeoutNewCalls(now); - timeoutCallsToSend(now, callsToSend); - timeoutCallsInFlight(now, callsInFlight); + TimeoutProcessor timeoutProcessor = new TimeoutProcessor(now); + timeoutNewCalls(timeoutProcessor); + timeoutCallsToSend(timeoutProcessor, callsToSend); + timeoutCallsInFlight(timeoutProcessor, callsInFlight); + + long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs()); + if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) { + pollTimeout = Math.min(pollTimeout, curHardShutdownTimeMs - now); + } // Handle new calls and metadata update requests. prevMetadataVersion = checkMetadataReady(prevMetadataVersion); - long pollTimeout = 1200000; if (prevMetadataVersion == null) { chooseNodesForNewCalls(now, callsToSend); pollTimeout = Math.min(pollTimeout, @@ -826,10 +930,14 @@ public class KafkaAdminClient extends AdminClient { handleResponses(now, responses, callsInFlight, correlationIdToCalls); } int numTimedOut = 0; + TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE); synchronized (this) { - numTimedOut += timeoutCalls(Long.MAX_VALUE, newCalls); + numTimedOut += timeoutProcessor.handleTimeouts(newCalls, + "The AdminClient thread has exited."); + newCalls = null; } - numTimedOut += timeoutCalls(Long.MAX_VALUE, correlationIdToCalls.values()); + numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(), + "The AdminClient thread has exited."); if (numTimedOut > 0) { log.debug("{}: timed out {} remaining operations.", clientId, numTimedOut); } @@ -838,15 +946,51 @@ public class KafkaAdminClient extends AdminClient { log.debug("{}: exiting AdminClientRunnable thread.", clientId); } - void call(Call call, long now) { + /** + * Queue a call for sending. + * + * If the AdminClient thread has exited, this will fail. Otherwise, it will succeed (even + * if the AdminClient is shutting down.) This function should called when retrying an + * existing call. + * + * @param call The new call object. + * @param now The current time in milliseconds. + */ + void enqueue(Call call, long now) { if (log.isDebugEnabled()) { log.debug("{}: queueing {} with a timeout {} ms from now.", clientId, call, call.deadlineMs - now); } + boolean accepted = false; synchronized (this) { - newCalls.add(call); + if (newCalls != null) { + newCalls.add(call); + accepted = true; + } + } + if (accepted) { + client.wakeup(); // wake the thread if it is in poll() + } else { + log.debug("{}: the AdminClient thread has exited. Timing out {}.", clientId, call); + call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread has exited.")); + } + } + + /** + * Initiate a new call. + * + * This will fail if the AdminClient is scheduled to shut down. + * + * @param call The new call object. + * @param now The current time in milliseconds. + */ + void call(Call call, long now) { + if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) { + log.debug("{}: the AdminClient is not accepting new calls. Timing out {}.", clientId, call); + call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls.")); + } else { + enqueue(call, now); } - client.wakeup(); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java index 1fa02490b1b..ba7b528926f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java @@ -52,7 +52,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable { this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); this.mockClient = new MockClient(Time.SYSTEM, this.metadata); - this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata); + this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata); } public Cluster cluster() { diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala index 81f5c27c829..c52594b6427 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala @@ -18,9 +18,9 @@ package kafka.api import java.util import java.util.{Collections, Properties} -import java.util.concurrent.ExecutionException +import java.util.concurrent.{ExecutionException, TimeUnit} -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Time, Utils} import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.server.{Defaults, KafkaConfig} @@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin._ import kafka.utils.{Logging, TestUtils} import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture -import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException} +import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException} import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Rule, Test} import org.apache.kafka.common.requests.MetadataResponse @@ -381,6 +381,59 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin client.close() } + /** + * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, + * since they can be done within the timeout. New calls should receive timeouts. + */ + @Test + def testDelayedClose(): Unit = { + client = AdminClient.create(createConfig()) + val topics = Seq("mytopic", "mytopic2") + val newTopics = topics.map(new NewTopic(_, 1, 1)) + val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() + client.close(2, TimeUnit.HOURS) + val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() + assertFutureExceptionTypeEquals(future2, classOf[TimeoutException]) + future.get + client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect + } + + /** + * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long + * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. + */ + @Test + def testForceClose(): Unit = { + val config = createConfig() + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22") + client = AdminClient.create(config) + // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be + // cancelled by the close operation. + val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, + new CreateTopicsOptions().timeoutMs(900000)).all() + client.close(0, TimeUnit.MILLISECONDS) + assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) + } + + /** + * Check that a call with a timeout does not complete before the minimum timeout has elapsed, + * even when the default request timeout is shorter. + */ + @Test + def testMinimumRequestTimeouts(): Unit = { + val config = createConfig() + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22") + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") + client = AdminClient.create(config) + val startTimeMs = Time.SYSTEM.milliseconds() + val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, + new CreateTopicsOptions().timeoutMs(2)).all() + assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) + val endTimeMs = Time.SYSTEM.milliseconds() + assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs); + client.close() + } + override def generateConfigs() = { val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)