diff --git a/core/src/main/scala/kafka/common/UnknownCodecException.scala b/core/src/main/scala/kafka/common/UnknownCodecException.scala
deleted file mode 100644
index 7e669019c32..00000000000
--- a/core/src/main/scala/kafka/common/UnknownCodecException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Indicates the client has requested a range no longer available on the server
- */
-class UnknownCodecException(message: String) extends RuntimeException(message) {
- def this() = this(null)
-}
-
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 450fcfea461..935599048da 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -17,6 +17,7 @@
package kafka.server
+import com.yammer.metrics.core.Meter
import kafka.common.ClientIdAndBroker
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.CoreUtils.inLock
@@ -759,7 +760,7 @@ abstract class AbstractFetcherThread(name: String,
leaderEpochInRequest: Optional[Integer],
fetchPartitionData: PartitionData): Boolean = {
try {
- val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitionData);
+ val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitionData)
// TODO: use fetchTierStateMachine.maybeAdvanceState when implementing async tiering logic in KAFKA-13560
@@ -879,7 +880,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
lagVal.set(newLag)
}
- def lag = lagVal.get
+ def lag: Long = lagVal.get
def unregister(): Unit = {
metricsGroup.removeMetric(FetcherMetrics.ConsumerLag, tags)
@@ -909,13 +910,13 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
class FetcherStats(metricId: ClientIdAndBroker) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
- val tags = Map("clientId" -> metricId.clientId,
+ val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
"brokerHost" -> metricId.brokerHost,
"brokerPort" -> metricId.brokerPort.toString).asJava
- val requestRate = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
+ val requestRate: Meter = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
- val byteRate = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes", TimeUnit.SECONDS, tags)
+ val byteRate: Meter = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes", TimeUnit.SECONDS, tags)
def unregister(): Unit = {
metricsGroup.removeMetric(FetcherMetrics.RequestsPerSec, tags)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index c17fe7c0c38..10074d4a5f5 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -252,7 +252,7 @@ class BrokerLifecycleManager(
* Start shutting down the BrokerLifecycleManager, but do not block.
*/
def beginShutdown(): Unit = {
- eventQueue.beginShutdown("beginShutdown");
+ eventQueue.beginShutdown("beginShutdown")
}
/**
@@ -483,7 +483,7 @@ class BrokerLifecycleManager(
override def run(): Unit = {
if (!initialRegistrationSucceeded) {
error("Shutting down because we were unable to register with the controller quorum.")
- eventQueue.beginShutdown("registrationTimeout");
+ eventQueue.beginShutdown("registrationTimeout")
}
}
}
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index 6e57d97bc3e..d330210b9d5 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -28,11 +28,10 @@ import org.apache.kafka.server.quota.ClientQuotaCallback
import scala.jdk.CollectionConverters._
object ClientRequestQuotaManager {
- val QuotaRequestPercentDefault = Int.MaxValue.toDouble
- val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
+ val NanosToPercentagePerSecond: Double = 100.0 / TimeUnit.SECONDS.toNanos(1)
// Since exemptSensor is for all clients and has a constant name, we do not expire exemptSensor and only
// create once.
- val DefaultInactiveExemptSensorExpirationTimeSeconds = Long.MaxValue
+ val DefaultInactiveExemptSensorExpirationTimeSeconds: Long = Long.MaxValue
private val ExemptSensorName = "exempt-" + QuotaType.Request
}
diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index ccdd0ac31af..e095ecdc051 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -46,7 +46,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
private def validateTopicName(
name: String
): Unit = {
- if (name.isEmpty()) {
+ if (name.isEmpty) {
throw new InvalidRequestException("Default topic resources are not allowed.")
}
Topic.validate(name)
@@ -55,7 +55,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
private def validateBrokerName(
name: String
): Unit = {
- if (!name.isEmpty()) {
+ if (name.nonEmpty) {
val brokerId = try {
Integer.valueOf(name)
} catch {
@@ -96,10 +96,10 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
val properties = new Properties()
val nullTopicConfigs = new mutable.ArrayBuffer[String]()
config.entrySet().forEach(e => {
- if (e.getValue() == null) {
- nullTopicConfigs += e.getKey()
+ if (e.getValue == null) {
+ nullTopicConfigs += e.getKey
} else {
- properties.setProperty(e.getKey(), e.getValue())
+ properties.setProperty(e.getKey, e.getValue)
}
})
if (nullTopicConfigs.nonEmpty) {
diff --git a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index f011a6b3663..e21c4699bcd 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -132,7 +132,6 @@ class PermissiveControllerMutationQuota(private val time: Time,
}
object ControllerMutationQuotaManager {
- val QuotaControllerMutationDefault = Int.MaxValue.toDouble
/**
* This calculates the amount of time needed to bring the TokenBucket within quota
diff --git a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index c5868c05732..ae717bce624 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -143,7 +143,7 @@ class ControllerRegistrationManager(
* Start shutting down the ControllerRegistrationManager, but do not block.
*/
def beginShutdown(): Unit = {
- eventQueue.beginShutdown("beginShutdown");
+ eventQueue.beginShutdown("beginShutdown")
}
/**
@@ -206,7 +206,7 @@ class ControllerRegistrationManager(
info("maybeSendControllerRegistration: cannot register yet because the metadata version is " +
s"still $metadataVersion, which does not support KIP-919 controller registration.")
} else if (pendingRpc) {
- info("maybeSendControllerRegistration: waiting for the previous RPC to complete.");
+ info("maybeSendControllerRegistration: waiting for the previous RPC to complete.")
} else {
sendControllerRegistration()
}
@@ -224,7 +224,7 @@ class ControllerRegistrationManager(
setControllerId(nodeId).
setFeatures(features).
setIncarnationId(incarnationId).
- setListeners(listenerInfo.toControllerRegistrationRequest())
+ setListeners(listenerInfo.toControllerRegistrationRequest)
info(s"sendControllerRegistration: attempting to send $data")
_channelManager.sendRequest(new ControllerRegistrationRequest.Builder(data),
new RegistrationResponseHandler())
@@ -274,7 +274,7 @@ class ControllerRegistrationManager(
}
private def scheduleNextCommunication(intervalMs: Long): Unit = {
- trace(s"Scheduling next communication at ${intervalMs} ms from now.")
+ trace(s"Scheduling next communication at $intervalMs ms from now.")
val deadlineNs = time.nanoseconds() + MILLISECONDS.toNanos(intervalMs)
eventQueue.scheduleDeferred("communication",
new DeadlineFunction(deadlineNs),
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 9ce6082e76c..f8b60b6071d 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -17,6 +17,8 @@
package kafka.server
+import com.yammer.metrics.core.Meter
+
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
@@ -184,7 +186,7 @@ class DelayedFetch(
object DelayedFetchMetrics {
private val metricsGroup = new KafkaMetricsGroup(DelayedFetchMetrics.getClass)
private val FetcherTypeKey = "fetcherType"
- val followerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
- val consumerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
+ val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
+ val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
}
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 48a17442e0e..629199148a8 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -167,7 +167,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
* Return all the current watcher lists,
* note that the returned watchers may be removed from the list by other threads
*/
- def allWatchers = {
+ def allWatchers: Iterable[Watchers] = {
watchersByKey.values
}
}
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 2f1261ada23..7a21a86260c 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -34,7 +34,7 @@ import scala.jdk.CollectionConverters._
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
@volatile var acksPending = false
- override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " +
+ override def toString: String = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " +
s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]"
}
@@ -62,7 +62,7 @@ class DelayedProduce(delayMs: Long,
lockOpt: Option[Lock] = None)
extends DelayedOperation(delayMs, lockOpt) {
- override lazy val logger = DelayedProduce.logger
+ override lazy val logger: Logger = DelayedProduce.logger
// first update the acks pending variable according to the error code
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 618faeee790..485d76f7661 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -83,7 +83,7 @@ object Defaults {
val BrokerHeartbeatIntervalMs = 2000
val BrokerSessionTimeoutMs = 9000
val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
- val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1);
+ val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1)
val MetadataMaxIdleIntervalMs = 500
val MetadataMaxRetentionBytes = 100 * 1024 * 1024
val DeleteTopicEnable = true
@@ -224,7 +224,7 @@ object Defaults {
val MetricNumSamples = 2
val MetricSampleWindowMs = 30000
val MetricReporterClasses = ""
- val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
+ val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString
val AutoIncludeJmxReporter = true
@@ -241,7 +241,7 @@ object Defaults {
val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
val SslEndpointIdentificationAlgorithm = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
val SslClientAuthentication = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT)
- val SslClientAuthenticationValidValues = SslClientAuth.VALUES.asScala.map(v => v.toString().toLowerCase(Locale.ROOT)).asJava.toArray(new Array[String](0))
+ val SslClientAuthenticationValidValues = SslClientAuth.VALUES.asScala.map(v => v.toString.toLowerCase(Locale.ROOT)).asJava.toArray(new Array[String](0))
val SslPrincipalMappingRules = BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES
/** ********* General Security configuration ***********/
@@ -754,7 +754,7 @@ object KafkaConfig {
"maximum bytes limit is reached."
val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often the active " +
"controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
- s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}";
+ s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}"
val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the controller. This is required " +
"if running in KRaft mode. When communicating with the controller quorum, the broker will always use the first listener in this list.\n " +
"Note: The ZooKeeper-based controller should not set this configuration."
@@ -886,7 +886,7 @@ object KafkaConfig {
val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone message markers for log compacted topics. This setting also gives a bound " +
"on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise " +
- "tombstones messages may be collected before a consumer completes their scan).";
+ "tombstones messages may be collected before a consumer completes their scan)."
val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted."
val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
@@ -915,13 +915,13 @@ object KafkaConfig {
"broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
- "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
+ "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."
val LogMessageTimestampAfterMaxMsDoc = "This configuration sets the allowable timestamp difference between the " +
"message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
- "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
+ "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server."
@@ -939,7 +939,7 @@ object KafkaConfig {
"implement the org.apache.kafka.server.policy.CreateTopicPolicy
interface."
val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " +
"implement the org.apache.kafka.server.policy.AlterConfigPolicy
interface."
- val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC;
+ val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels."
@@ -1949,7 +1949,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// A user-supplied IBP was given
val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
if (!configuredVersion.isKRaftSupported) {
- throw new ConfigException(s"A non-KRaft version ${interBrokerProtocolVersionString} given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " +
+ throw new ConfigException(s"A non-KRaft version $interBrokerProtocolVersionString given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " +
s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
} else {
warn(s"${KafkaConfig.InterBrokerProtocolVersionProp} is deprecated in KRaft mode as of 3.3 and will only " +
@@ -1968,7 +1968,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp)
/** ********* Feature configuration ***********/
- def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported()
+ def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported
/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
@@ -2038,12 +2038,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName }
def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol }
def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
- val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled()
+ val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled
/** ********* DelegationToken Configuration **************/
val delegationTokenSecretKey = Option(getPassword(KafkaConfig.DelegationTokenSecretKeyProp))
.getOrElse(getPassword(KafkaConfig.DelegationTokenSecretKeyAliasProp))
- val tokenAuthEnabled = (delegationTokenSecretKey != null && !delegationTokenSecretKey.value.isEmpty)
+ val tokenAuthEnabled = delegationTokenSecretKey != null && delegationTokenSecretKey.value.nonEmpty
val delegationTokenMaxLifeMs = getLong(KafkaConfig.DelegationTokenMaxLifeTimeProp)
val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
@@ -2222,7 +2222,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
def usesTopicId: Boolean =
- usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported()
+ usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported
val isRemoteLogStorageSystemEnabled: lang.Boolean = getBoolean(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP)
@@ -2310,7 +2310,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
require(advertisedListenerNames.nonEmpty,
"There must be at least one advertised listener." + (
- if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in ${ControllerListenerNamesProp}?" else ""))
+ if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in $ControllerListenerNamesProp?" else ""))
}
if (processRoles == Set(BrokerRole)) {
// KRaft broker-only
@@ -2434,11 +2434,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")
if (maxConnectionsPerIp == 0)
- require(!maxConnectionsPerIpOverrides.isEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" +
+ require(maxConnectionsPerIpOverrides.nonEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" +
s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.")
val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address => Utils.validHostPattern(address))
- if (!invalidAddresses.isEmpty)
+ if (invalidAddresses.nonEmpty)
throw new IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp} contains invalid addresses : ${invalidAddresses.mkString(",")}")
if (connectionsMaxIdleMs >= 0)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 92d4274bcb6..c28cfc6f543 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -120,10 +120,10 @@ object KafkaRaftServer {
sealed trait ProcessRole
case object BrokerRole extends ProcessRole {
- override def toString(): String = "broker"
+ override def toString: String = "broker"
}
case object ControllerRole extends ProcessRole {
- override def toString(): String = "controller"
+ override def toString: String = "controller"
}
/**
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
index 97af1688482..ec4425de9e6 100644
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -117,7 +117,7 @@ class PartitionMetadataFile(val file: File,
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
writer.flush()
- fileOutputStream.getFD().sync()
+ fileOutputStream.getFD.sync()
} finally {
writer.close()
}
diff --git a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
index 819fcc3d38d..c33bec98a67 100644
--- a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
@@ -35,7 +35,7 @@ class AclPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
- override def name(): String = s"AclPublisher ${nodeType} id=${nodeId}"
+ override def name(): String = s"AclPublisher $nodeType id=$nodeId"
var completedInitialLoad = false
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index ff183324166..3e4f798abb4 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -136,7 +136,7 @@ final class BrokerServerMetrics private (
}
-final object BrokerServerMetrics {
+object BrokerServerMetrics {
private val metricGroupName = "broker-metadata-metrics"
private def addMetric[T](metrics: Metrics, name: MetricName)(func: Long => T): Unit = {
diff --git a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
index 59f91970f6b..34e14442b4d 100644
--- a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
@@ -35,7 +35,7 @@ class DelegationTokenPublisher(
var _firstPublish = true
- override def name(): String = s"DelegationTokenPublisher ${nodeType} id=${conf.nodeId}"
+ override def name(): String = s"DelegationTokenPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
@@ -58,7 +58,7 @@ class DelegationTokenPublisher(
if (_firstPublish) {
// Initialize the tokenCache with the Image
Option(newImage.delegationTokens()).foreach { delegationTokenImage =>
- delegationTokenImage.tokens().forEach { (tokenId, delegationTokenData) =>
+ delegationTokenImage.tokens().forEach { (_, delegationTokenData) =>
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation()))
}
}
@@ -77,7 +77,7 @@ class DelegationTokenPublisher(
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
- s"publishing DelegationToken changes from ${deltaName}", t)
+ s"publishing DelegationToken changes from $deltaName", t)
}
}
}
diff --git a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
index 0ac93a46db1..94aaebf00a6 100644
--- a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
@@ -32,7 +32,7 @@ class DynamicClientQuotaPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
- override def name(): String = s"DynamicClientQuotaPublisher ${nodeType} id=${conf.nodeId}"
+ override def name(): String = s"DynamicClientQuotaPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
@@ -53,7 +53,7 @@ class DynamicClientQuotaPublisher(
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
- s"publishing dynamic client quota changes from ${deltaName}", t)
+ s"publishing dynamic client quota changes from $deltaName", t)
}
}
}
diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
index b5db4e246b6..8f16d6c0513 100644
--- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
@@ -35,7 +35,7 @@ class DynamicConfigPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
- override def name(): String = s"DynamicConfigPublisher ${nodeType} id=${conf.nodeId}"
+ override def name(): String = s"DynamicConfigPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
@@ -66,7 +66,7 @@ class DynamicConfigPublisher(
} catch {
case t: Throwable => faultHandler.handleFault("Error updating topic " +
s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
- s"in ${deltaName}", t)
+ s"in $deltaName", t)
}
)
case BROKER =>
@@ -81,7 +81,7 @@ class DynamicConfigPublisher(
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
- s"in ${deltaName}", t)
+ s"in $deltaName", t)
}
} else if (resource.name() == conf.nodeId.toString) {
try {
@@ -97,7 +97,7 @@ class DynamicConfigPublisher(
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"node with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
- s"in ${deltaName}", t)
+ s"in $deltaName", t)
}
}
)
@@ -107,7 +107,7 @@ class DynamicConfigPublisher(
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
- s"publishing dynamic configuration changes from ${deltaName}", t)
+ s"publishing dynamic configuration changes from $deltaName", t)
}
}
diff --git a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
index 535ca6e8b57..bacac660e7b 100644
--- a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
@@ -33,7 +33,7 @@ class ScramPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
- override def name(): String = s"ScramPublisher ${nodeType} id=${conf.nodeId}"
+ override def name(): String = s"ScramPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
@@ -65,7 +65,7 @@ class ScramPublisher(
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
- s"publishing SCRAM changes from ${deltaName}", t)
+ s"publishing SCRAM changes from $deltaName", t)
}
}
}
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 9307a5d3996..88e9f8aa2a8 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -94,7 +94,7 @@ object CoreUtils {
*/
def registerMBean(mbean: Object, name: String): Boolean = {
try {
- val mbs = ManagementFactory.getPlatformMBeanServer()
+ val mbs = ManagementFactory.getPlatformMBeanServer
mbs synchronized {
val objName = new ObjectName(name)
if (mbs.isRegistered(objName))
@@ -141,7 +141,7 @@ object CoreUtils {
* Create an instance of the class with the given class name
*/
def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
- val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader()).asInstanceOf[Class[T]]
+ val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader).asInstanceOf[Class[T]]
val constructor = klass.getConstructor(args.map(_.getClass): _*)
constructor.newInstance(args: _*)
}
@@ -173,7 +173,7 @@ object CoreUtils {
}
def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
- listenerListToEndPoints(listeners, securityProtocolMap, true)
+ listenerListToEndPoints(listeners, securityProtocolMap, requireDistinctPorts = true)
}
def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index e49f3a57f1e..5fbbebed475 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -33,15 +33,15 @@ import javax.management.ObjectName
object Mx4jLoader extends Logging {
def maybeLoad(): Boolean = {
- val props = new VerifiableProperties(System.getProperties())
- if (!props.getBoolean("kafka_mx4jenable", false))
+ val props = new VerifiableProperties(System.getProperties)
+ if (!props.getBoolean("kafka_mx4jenable", default = false))
return false
val address = props.getString("mx4jaddress", "0.0.0.0")
val port = props.getInt("mx4jport", 8082)
try {
debug("Will try to load MX4j now, if it's in the classpath")
- val mbs = ManagementFactory.getPlatformMBeanServer()
+ val mbs = ManagementFactory.getPlatformMBeanServer
val processorName = new ObjectName("Server:name=XSLTProcessor")
val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor")
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
index 1d89e3fe021..d4737be08ce 100644
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -78,7 +78,7 @@ class NoOpPasswordEncoder extends PasswordEncoder {
* @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
* @param iterations Iteration count used for encoding.
*
- * The provided `keyFactoryAlgorithm`, 'cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
+ * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
* The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
*
*/
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index 824c7dcfc28..1e77624a9fb 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -54,7 +54,7 @@ class Throttler(@volatile var desiredRatePerSec: Double,
def maybeThrottle(observed: Double): Unit = {
val msPerSec = TimeUnit.SECONDS.toMillis(1)
val nsPerSec = TimeUnit.SECONDS.toNanos(1)
- val currentDesiredRatePerSec = desiredRatePerSec;
+ val currentDesiredRatePerSec = desiredRatePerSec
meter.mark(observed.toLong)
lock synchronized {
@@ -83,7 +83,7 @@ class Throttler(@volatile var desiredRatePerSec: Double,
}
def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
- desiredRatePerSec = updatedDesiredRatePerSec;
+ desiredRatePerSec = updatedDesiredRatePerSec
}
}
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 10586317f65..8f3ae49b7aa 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -32,7 +32,7 @@ object ToolsUtils {
val validHostPort = hostPorts.filter { hostPortData =>
org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
}
- val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length
+ val isValid = !validHostPort.isEmpty && validHostPort.length == hostPorts.length
if (!isValid)
CommandLineUtils.printUsageAndExit(parser, "Please provide valid host:port like host1:9091,host2:9092\n ")
}
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 462a7436bb1..54490e3dd65 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -161,7 +161,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
}
}
- def getBoolean(name: String) = getString(name).toBoolean
+ def getBoolean(name: String): Boolean = getString(name).toBoolean
/**
* Get a string property, or, if no such property is defined, return the given default value