From 6f1d4c693307d1c5d1a178d56e403bb7084d537f Mon Sep 17 00:00:00 2001 From: Kamal C Date: Fri, 16 Jun 2017 11:43:03 +0100 Subject: [PATCH] MINOR: Consolidate Utils.newThread, Utils.daemonThread and KafkaThread Removed the first two in favour of the latter. Author: Kamal C Reviewers: Ismael Juma Closes #3350 from Kamal15/utilcleanup --- .../apache/kafka/common/metrics/Metrics.java | 3 +- .../security/kerberos/KerberosLogin.java | 6 ++-- .../kafka/common/utils/KafkaThread.java | 12 ++++++-- .../org/apache/kafka/common/utils/Shell.java | 4 +-- .../org/apache/kafka/common/utils/Utils.java | 28 ------------------- .../scala/kafka/network/SocketServer.scala | 8 +++--- .../kafka/server/KafkaRequestHandler.scala | 4 +-- .../kafka/tools/SimpleConsumerShell.scala | 6 ++-- .../main/scala/kafka/utils/CoreUtils.scala | 4 +-- .../scala/kafka/utils/KafkaScheduler.scala | 4 +-- .../main/scala/kafka/utils/timer/Timer.scala | 4 +-- 11 files changed, 32 insertions(+), 51 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index c4cd6765263..0b4507b3aef 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.metrics; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -142,7 +143,7 @@ public class Metrics implements Closeable { // Creating a daemon thread to not block shutdown this.metricsScheduler.setThreadFactory(new ThreadFactory() { public Thread newThread(Runnable runnable) { - return Utils.newThread("SensorExpiryThread", runnable, true); + return KafkaThread.daemon("SensorExpiryThread", runnable); } }); this.metricsScheduler.scheduleAtFixedRate(new ExpireSensorTask(), 30, 30, TimeUnit.SECONDS); diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java index fe30a01a88d..a0aad542971 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java @@ -27,9 +27,9 @@ import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.security.authenticator.AbstractLogin; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.Shell; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,7 +129,7 @@ public class KerberosLogin extends AbstractLogin { // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development, // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running: // "modprinc -maxlife 3mins " in kadmin. - t = Utils.newThread(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() { + t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() { public void run() { log.info("[Principal={}]: TGT refresh thread started.", principal); while (true) { // renewal thread's main loop. if it exits from here, thread will exit. @@ -253,7 +253,7 @@ public class KerberosLogin extends AbstractLogin { } } } - }, true); + }); t.start(); return loginContext; } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java index 3eb025b91f6..dd634a50c2d 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java @@ -25,6 +25,14 @@ import org.slf4j.LoggerFactory; public class KafkaThread extends Thread { private final Logger log = LoggerFactory.getLogger(getClass()); + + public static KafkaThread daemon(final String name, Runnable runnable) { + return new KafkaThread(name, runnable, true); + } + + public static KafkaThread nonDaemon(final String name, Runnable runnable) { + return new KafkaThread(name, runnable, false); + } public KafkaThread(final String name, boolean daemon) { super(name); @@ -38,9 +46,9 @@ public class KafkaThread extends Thread { private void configureThread(final String name, boolean daemon) { setDaemon(daemon); - setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + setUncaughtExceptionHandler(new UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { - log.error("Uncaught exception in " + name + ": ", e); + log.error("Uncaught exception in thread '{}':", name, e); } }); } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java index 33bf3c1647a..ebfd0bacc5c 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Shell.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Shell.java @@ -96,7 +96,7 @@ abstract public class Shell { // read error and input streams as this would free up the buffers // free the error stream buffer - Thread errThread = Utils.newThread("kafka-shell-thread", new Runnable() { + Thread errThread = KafkaThread.nonDaemon("kafka-shell-thread", new Runnable() { @Override public void run() { try { @@ -110,7 +110,7 @@ abstract public class Shell { LOG.warn("Error reading the error stream", ioe); } } - }, false); + }); errThread.start(); try { diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 9fbc387664a..21fbaf4ea3f 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -436,34 +436,6 @@ public class Utils { return sw.toString(); } - /** - * Create a new thread - * @param name The name of the thread - * @param runnable The work for the thread to do - * @param daemon Should the thread block JVM shutdown? - * @return The unstarted thread - */ - public static Thread newThread(String name, Runnable runnable, boolean daemon) { - Thread thread = new Thread(runnable, name); - thread.setDaemon(daemon); - thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - public void uncaughtException(Thread t, Throwable e) { - log.error("Uncaught exception in thread '{}':", t.getName(), e); - } - }); - return thread; - } - - /** - * Create a daemon thread - * @param name The name of the thread - * @param runnable The runnable to execute in the background - * @return The unstarted thread - */ - public static Thread daemonThread(String name, Runnable runnable) { - return newThread(name, runnable, true); - } - /** * Print an error message and shutdown the JVM * @param message The error message diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 414557ea8dd..9088eb5c2a3 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerN import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{KafkaThread, Time} import scala.collection._ import JavaConverters._ @@ -91,7 +91,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) - Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start() + KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex @@ -253,8 +253,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint, this.synchronized { processors.foreach { processor => - Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", - processor, false).start() + KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", + processor).start() } } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 9983e3d36dd..feb07b81e7b 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -24,7 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import com.yammer.metrics.core.Meter import org.apache.kafka.common.internals.FatalExitError -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{KafkaThread, Time} /** * A thread that answers kafka requests. @@ -92,7 +92,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, val runnables = new Array[KafkaRequestHandler](numThreads) for(i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time) - Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start() + KafkaThread.daemon("kafka-request-handler-" + i, runnables(i)).start() } def shutdown() { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index b3643a3c7b0..da8b698f138 100755 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -27,7 +27,7 @@ import kafka.cluster.BrokerEndPoint import scala.collection.JavaConverters._ import kafka.common.{MessageFormatter, TopicAndPartition} import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{KafkaThread, Utils} /** * Command line program to dump out messages to standard out using the simple consumer @@ -201,7 +201,7 @@ object SimpleConsumerShell extends Logging { val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId) - val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { + val thread = KafkaThread.nonDaemon("kafka-simpleconsumer-shell", new Runnable() { def run() { var offset = startingOffset var numMessagesConsumed = 0 @@ -253,7 +253,7 @@ object SimpleConsumerShell extends Logging { info(s"Consumed $numMessagesConsumed messages") } } - }, false) + }) thread.start() thread.join() System.out.flush() diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 9ac95b5b9c1..0e8855cbc6e 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -32,7 +32,7 @@ import scala.collection._ import scala.collection.mutable import kafka.cluster.EndPoint import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{KafkaThread, Utils} /** * General helper functions! @@ -66,7 +66,7 @@ object CoreUtils extends Logging { * @return The unstarted thread */ def newThread(name: String, daemon: Boolean)(fun: => Unit): Thread = - Utils.newThread(name, runnable(fun), daemon) + new KafkaThread(name, runnable(fun), daemon) /** * Do the given action and log any exceptions thrown without rethrowing them diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index ef60c451048..8e130cfab47 100755 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -19,7 +19,7 @@ package kafka.utils import java.util.concurrent._ import atomic._ -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.KafkaThread /** * A scheduler for running jobs @@ -81,7 +81,7 @@ class KafkaScheduler(val threads: Int, executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) executor.setThreadFactory(new ThreadFactory() { def newThread(runnable: Runnable): Thread = - Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon) + new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon) }) } } diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index 05382712910..ae8caff8636 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.utils.threadsafe -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.utils.{KafkaThread, Time} trait Timer { /** @@ -60,7 +60,7 @@ class SystemTimer(executorName: String, // timeout timer private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() { def newThread(runnable: Runnable): Thread = - Utils.newThread("executor-"+executorName, runnable, false) + KafkaThread.nonDaemon("executor-"+executorName, runnable) }) private[this] val delayQueue = new DelayQueue[TimerTaskList]()