Browse Source

MINOR: Consolidate Utils.newThread, Utils.daemonThread and KafkaThread

Removed the first two in favour of the latter.

Author: Kamal C <kamal.chandraprakash@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3350 from Kamal15/utilcleanup
pull/3351/merge
Kamal C 8 years ago committed by Ismael Juma
parent
commit
6f1d4c6933
  1. 3
      clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
  2. 6
      clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
  3. 12
      clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
  4. 4
      clients/src/main/java/org/apache/kafka/common/utils/Shell.java
  5. 28
      clients/src/main/java/org/apache/kafka/common/utils/Utils.java
  6. 8
      core/src/main/scala/kafka/network/SocketServer.scala
  7. 4
      core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  8. 6
      core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
  9. 4
      core/src/main/scala/kafka/utils/CoreUtils.scala
  10. 4
      core/src/main/scala/kafka/utils/KafkaScheduler.scala
  11. 4
      core/src/main/scala/kafka/utils/timer/Timer.scala

3
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java

@ -18,6 +18,7 @@ package org.apache.kafka.common.metrics; @@ -18,6 +18,7 @@ package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@ -142,7 +143,7 @@ public class Metrics implements Closeable { @@ -142,7 +143,7 @@ public class Metrics implements Closeable {
// Creating a daemon thread to not block shutdown
this.metricsScheduler.setThreadFactory(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
return Utils.newThread("SensorExpiryThread", runnable, true);
return KafkaThread.daemon("SensorExpiryThread", runnable);
}
});
this.metricsScheduler.scheduleAtFixedRate(new ExpireSensorTask(), 30, 30, TimeUnit.SECONDS);

6
clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java

@ -27,9 +27,9 @@ import org.apache.kafka.common.security.JaasContext; @@ -27,9 +27,9 @@ import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.authenticator.AbstractLogin;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Shell;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -129,7 +129,7 @@ public class KerberosLogin extends AbstractLogin { @@ -129,7 +129,7 @@ public class KerberosLogin extends AbstractLogin {
// TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
// you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
// "modprinc -maxlife 3mins <principal>" in kadmin.
t = Utils.newThread(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
public void run() {
log.info("[Principal={}]: TGT refresh thread started.", principal);
while (true) { // renewal thread's main loop. if it exits from here, thread will exit.
@ -253,7 +253,7 @@ public class KerberosLogin extends AbstractLogin { @@ -253,7 +253,7 @@ public class KerberosLogin extends AbstractLogin {
}
}
}
}, true);
});
t.start();
return loginContext;
}

12
clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java

@ -25,6 +25,14 @@ import org.slf4j.LoggerFactory; @@ -25,6 +25,14 @@ import org.slf4j.LoggerFactory;
public class KafkaThread extends Thread {
private final Logger log = LoggerFactory.getLogger(getClass());
public static KafkaThread daemon(final String name, Runnable runnable) {
return new KafkaThread(name, runnable, true);
}
public static KafkaThread nonDaemon(final String name, Runnable runnable) {
return new KafkaThread(name, runnable, false);
}
public KafkaThread(final String name, boolean daemon) {
super(name);
@ -38,9 +46,9 @@ public class KafkaThread extends Thread { @@ -38,9 +46,9 @@ public class KafkaThread extends Thread {
private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.error("Uncaught exception in " + name + ": ", e);
log.error("Uncaught exception in thread '{}':", name, e);
}
});
}

4
clients/src/main/java/org/apache/kafka/common/utils/Shell.java

@ -96,7 +96,7 @@ abstract public class Shell { @@ -96,7 +96,7 @@ abstract public class Shell {
// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = Utils.newThread("kafka-shell-thread", new Runnable() {
Thread errThread = KafkaThread.nonDaemon("kafka-shell-thread", new Runnable() {
@Override
public void run() {
try {
@ -110,7 +110,7 @@ abstract public class Shell { @@ -110,7 +110,7 @@ abstract public class Shell {
LOG.warn("Error reading the error stream", ioe);
}
}
}, false);
});
errThread.start();
try {

28
clients/src/main/java/org/apache/kafka/common/utils/Utils.java

@ -436,34 +436,6 @@ public class Utils { @@ -436,34 +436,6 @@ public class Utils {
return sw.toString();
}
/**
* Create a new thread
* @param name The name of the thread
* @param runnable The work for the thread to do
* @param daemon Should the thread block JVM shutdown?
* @return The unstarted thread
*/
public static Thread newThread(String name, Runnable runnable, boolean daemon) {
Thread thread = new Thread(runnable, name);
thread.setDaemon(daemon);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.error("Uncaught exception in thread '{}':", t.getName(), e);
}
});
return thread;
}
/**
* Create a daemon thread
* @param name The name of the thread
* @param runnable The runnable to execute in the background
* @return The unstarted thread
*/
public static Thread daemonThread(String name, Runnable runnable) {
return newThread(name, runnable, true);
}
/**
* Print an error message and shutdown the JVM
* @param message The error message

8
core/src/main/scala/kafka/network/SocketServer.scala

@ -37,7 +37,7 @@ import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerN @@ -37,7 +37,7 @@ import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerN
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.protocol.types.SchemaException
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.{KafkaThread, Time}
import scala.collection._
import JavaConverters._
@ -91,7 +91,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time @@ -91,7 +91,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
@ -253,8 +253,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint, @@ -253,8 +253,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
this.synchronized {
processors.foreach { processor =>
Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor, false).start()
KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor).start()
}
}

4
core/src/main/scala/kafka/server/KafkaRequestHandler.scala

@ -24,7 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -24,7 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.{KafkaThread, Time}
/**
* A thread that answers kafka requests.
@ -92,7 +92,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, @@ -92,7 +92,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
KafkaThread.daemon("kafka-request-handler-" + i, runnables(i)).start()
}
def shutdown() {

6
core/src/main/scala/kafka/tools/SimpleConsumerShell.scala

@ -27,7 +27,7 @@ import kafka.cluster.BrokerEndPoint @@ -27,7 +27,7 @@ import kafka.cluster.BrokerEndPoint
import scala.collection.JavaConverters._
import kafka.common.{MessageFormatter, TopicAndPartition}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{KafkaThread, Utils}
/**
* Command line program to dump out messages to standard out using the simple consumer
@ -201,7 +201,7 @@ object SimpleConsumerShell extends Logging { @@ -201,7 +201,7 @@ object SimpleConsumerShell extends Logging {
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
fetchTargetBroker.port,
10000, 64*1024, clientId)
val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
val thread = KafkaThread.nonDaemon("kafka-simpleconsumer-shell", new Runnable() {
def run() {
var offset = startingOffset
var numMessagesConsumed = 0
@ -253,7 +253,7 @@ object SimpleConsumerShell extends Logging { @@ -253,7 +253,7 @@ object SimpleConsumerShell extends Logging {
info(s"Consumed $numMessagesConsumed messages")
}
}
}, false)
})
thread.start()
thread.join()
System.out.flush()

4
core/src/main/scala/kafka/utils/CoreUtils.scala

@ -32,7 +32,7 @@ import scala.collection._ @@ -32,7 +32,7 @@ import scala.collection._
import scala.collection.mutable
import kafka.cluster.EndPoint
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.{KafkaThread, Utils}
/**
* General helper functions!
@ -66,7 +66,7 @@ object CoreUtils extends Logging { @@ -66,7 +66,7 @@ object CoreUtils extends Logging {
* @return The unstarted thread
*/
def newThread(name: String, daemon: Boolean)(fun: => Unit): Thread =
Utils.newThread(name, runnable(fun), daemon)
new KafkaThread(name, runnable(fun), daemon)
/**
* Do the given action and log any exceptions thrown without rethrowing them

4
core/src/main/scala/kafka/utils/KafkaScheduler.scala

@ -19,7 +19,7 @@ package kafka.utils @@ -19,7 +19,7 @@ package kafka.utils
import java.util.concurrent._
import atomic._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.KafkaThread
/**
* A scheduler for running jobs
@ -81,7 +81,7 @@ class KafkaScheduler(val threads: Int, @@ -81,7 +81,7 @@ class KafkaScheduler(val threads: Int,
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
executor.setThreadFactory(new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
})
}
}

4
core/src/main/scala/kafka/utils/timer/Timer.scala

@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.threadsafe
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.{KafkaThread, Time}
trait Timer {
/**
@ -60,7 +60,7 @@ class SystemTimer(executorName: String, @@ -60,7 +60,7 @@ class SystemTimer(executorName: String,
// timeout timer
private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
Utils.newThread("executor-"+executorName, runnable, false)
KafkaThread.nonDaemon("executor-"+executorName, runnable)
})
private[this] val delayQueue = new DelayQueue[TimerTaskList]()

Loading…
Cancel
Save