Browse Source

MINOR: Various Scala cleanups in core (#14558)

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/13762/head
Mickael Maison 11 months ago committed by GitHub
parent
commit
9d04c7a045
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      core/src/main/scala/kafka/common/UnknownCodecException.scala
  2. 11
      core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  3. 4
      core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
  4. 5
      core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
  5. 10
      core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
  6. 1
      core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
  7. 8
      core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
  8. 6
      core/src/main/scala/kafka/server/DelayedFetch.scala
  9. 2
      core/src/main/scala/kafka/server/DelayedOperation.scala
  10. 4
      core/src/main/scala/kafka/server/DelayedProduce.scala
  11. 32
      core/src/main/scala/kafka/server/KafkaConfig.scala
  12. 4
      core/src/main/scala/kafka/server/KafkaRaftServer.scala
  13. 2
      core/src/main/scala/kafka/server/PartitionMetadataFile.scala
  14. 2
      core/src/main/scala/kafka/server/metadata/AclPublisher.scala
  15. 2
      core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
  16. 6
      core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
  17. 4
      core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
  18. 10
      core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
  19. 4
      core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
  20. 6
      core/src/main/scala/kafka/utils/CoreUtils.scala
  21. 6
      core/src/main/scala/kafka/utils/Mx4jLoader.scala
  22. 2
      core/src/main/scala/kafka/utils/PasswordEncoder.scala
  23. 4
      core/src/main/scala/kafka/utils/Throttler.scala
  24. 2
      core/src/main/scala/kafka/utils/ToolsUtils.scala
  25. 2
      core/src/main/scala/kafka/utils/VerifiableProperties.scala

26
core/src/main/scala/kafka/common/UnknownCodecException.scala

@ -1,26 +0,0 @@ @@ -1,26 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Indicates the client has requested a range no longer available on the server
*/
class UnknownCodecException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

11
core/src/main/scala/kafka/server/AbstractFetcherThread.scala

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package kafka.server
import com.yammer.metrics.core.Meter
import kafka.common.ClientIdAndBroker
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.CoreUtils.inLock
@ -759,7 +760,7 @@ abstract class AbstractFetcherThread(name: String, @@ -759,7 +760,7 @@ abstract class AbstractFetcherThread(name: String,
leaderEpochInRequest: Optional[Integer],
fetchPartitionData: PartitionData): Boolean = {
try {
val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitionData);
val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitionData)
// TODO: use fetchTierStateMachine.maybeAdvanceState when implementing async tiering logic in KAFKA-13560
@ -879,7 +880,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) { @@ -879,7 +880,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
lagVal.set(newLag)
}
def lag = lagVal.get
def lag: Long = lagVal.get
def unregister(): Unit = {
metricsGroup.removeMetric(FetcherMetrics.ConsumerLag, tags)
@ -909,13 +910,13 @@ class FetcherLagStats(metricId: ClientIdAndBroker) { @@ -909,13 +910,13 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
class FetcherStats(metricId: ClientIdAndBroker) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val tags = Map("clientId" -> metricId.clientId,
val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
"brokerHost" -> metricId.brokerHost,
"brokerPort" -> metricId.brokerPort.toString).asJava
val requestRate = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val requestRate: Meter = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val byteRate = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes", TimeUnit.SECONDS, tags)
val byteRate: Meter = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes", TimeUnit.SECONDS, tags)
def unregister(): Unit = {
metricsGroup.removeMetric(FetcherMetrics.RequestsPerSec, tags)

4
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala

@ -252,7 +252,7 @@ class BrokerLifecycleManager( @@ -252,7 +252,7 @@ class BrokerLifecycleManager(
* Start shutting down the BrokerLifecycleManager, but do not block.
*/
def beginShutdown(): Unit = {
eventQueue.beginShutdown("beginShutdown");
eventQueue.beginShutdown("beginShutdown")
}
/**
@ -483,7 +483,7 @@ class BrokerLifecycleManager( @@ -483,7 +483,7 @@ class BrokerLifecycleManager(
override def run(): Unit = {
if (!initialRegistrationSucceeded) {
error("Shutting down because we were unable to register with the controller quorum.")
eventQueue.beginShutdown("registrationTimeout");
eventQueue.beginShutdown("registrationTimeout")
}
}
}

5
core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala

@ -28,11 +28,10 @@ import org.apache.kafka.server.quota.ClientQuotaCallback @@ -28,11 +28,10 @@ import org.apache.kafka.server.quota.ClientQuotaCallback
import scala.jdk.CollectionConverters._
object ClientRequestQuotaManager {
val QuotaRequestPercentDefault = Int.MaxValue.toDouble
val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
val NanosToPercentagePerSecond: Double = 100.0 / TimeUnit.SECONDS.toNanos(1)
// Since exemptSensor is for all clients and has a constant name, we do not expire exemptSensor and only
// create once.
val DefaultInactiveExemptSensorExpirationTimeSeconds = Long.MaxValue
val DefaultInactiveExemptSensorExpirationTimeSeconds: Long = Long.MaxValue
private val ExemptSensorName = "exempt-" + QuotaType.Request
}

10
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala

@ -46,7 +46,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu @@ -46,7 +46,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
private def validateTopicName(
name: String
): Unit = {
if (name.isEmpty()) {
if (name.isEmpty) {
throw new InvalidRequestException("Default topic resources are not allowed.")
}
Topic.validate(name)
@ -55,7 +55,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu @@ -55,7 +55,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
private def validateBrokerName(
name: String
): Unit = {
if (!name.isEmpty()) {
if (name.nonEmpty) {
val brokerId = try {
Integer.valueOf(name)
} catch {
@ -96,10 +96,10 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu @@ -96,10 +96,10 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
val properties = new Properties()
val nullTopicConfigs = new mutable.ArrayBuffer[String]()
config.entrySet().forEach(e => {
if (e.getValue() == null) {
nullTopicConfigs += e.getKey()
if (e.getValue == null) {
nullTopicConfigs += e.getKey
} else {
properties.setProperty(e.getKey(), e.getValue())
properties.setProperty(e.getKey, e.getValue)
}
})
if (nullTopicConfigs.nonEmpty) {

1
core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala

@ -132,7 +132,6 @@ class PermissiveControllerMutationQuota(private val time: Time, @@ -132,7 +132,6 @@ class PermissiveControllerMutationQuota(private val time: Time,
}
object ControllerMutationQuotaManager {
val QuotaControllerMutationDefault = Int.MaxValue.toDouble
/**
* This calculates the amount of time needed to bring the TokenBucket within quota

8
core/src/main/scala/kafka/server/ControllerRegistrationManager.scala

@ -143,7 +143,7 @@ class ControllerRegistrationManager( @@ -143,7 +143,7 @@ class ControllerRegistrationManager(
* Start shutting down the ControllerRegistrationManager, but do not block.
*/
def beginShutdown(): Unit = {
eventQueue.beginShutdown("beginShutdown");
eventQueue.beginShutdown("beginShutdown")
}
/**
@ -206,7 +206,7 @@ class ControllerRegistrationManager( @@ -206,7 +206,7 @@ class ControllerRegistrationManager(
info("maybeSendControllerRegistration: cannot register yet because the metadata version is " +
s"still $metadataVersion, which does not support KIP-919 controller registration.")
} else if (pendingRpc) {
info("maybeSendControllerRegistration: waiting for the previous RPC to complete.");
info("maybeSendControllerRegistration: waiting for the previous RPC to complete.")
} else {
sendControllerRegistration()
}
@ -224,7 +224,7 @@ class ControllerRegistrationManager( @@ -224,7 +224,7 @@ class ControllerRegistrationManager(
setControllerId(nodeId).
setFeatures(features).
setIncarnationId(incarnationId).
setListeners(listenerInfo.toControllerRegistrationRequest())
setListeners(listenerInfo.toControllerRegistrationRequest)
info(s"sendControllerRegistration: attempting to send $data")
_channelManager.sendRequest(new ControllerRegistrationRequest.Builder(data),
new RegistrationResponseHandler())
@ -274,7 +274,7 @@ class ControllerRegistrationManager( @@ -274,7 +274,7 @@ class ControllerRegistrationManager(
}
private def scheduleNextCommunication(intervalMs: Long): Unit = {
trace(s"Scheduling next communication at ${intervalMs} ms from now.")
trace(s"Scheduling next communication at $intervalMs ms from now.")
val deadlineNs = time.nanoseconds() + MILLISECONDS.toNanos(intervalMs)
eventQueue.scheduleDeferred("communication",
new DeadlineFunction(deadlineNs),

6
core/src/main/scala/kafka/server/DelayedFetch.scala

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package kafka.server
import com.yammer.metrics.core.Meter
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
@ -184,7 +186,7 @@ class DelayedFetch( @@ -184,7 +186,7 @@ class DelayedFetch(
object DelayedFetchMetrics {
private val metricsGroup = new KafkaMetricsGroup(DelayedFetchMetrics.getClass)
private val FetcherTypeKey = "fetcherType"
val followerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
val consumerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
}

2
core/src/main/scala/kafka/server/DelayedOperation.scala

@ -167,7 +167,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri @@ -167,7 +167,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
* Return all the current watcher lists,
* note that the returned watchers may be removed from the list by other threads
*/
def allWatchers = {
def allWatchers: Iterable[Watchers] = {
watchersByKey.values
}
}

4
core/src/main/scala/kafka/server/DelayedProduce.scala

@ -34,7 +34,7 @@ import scala.jdk.CollectionConverters._ @@ -34,7 +34,7 @@ import scala.jdk.CollectionConverters._
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
@volatile var acksPending = false
override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " +
override def toString: String = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " +
s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]"
}
@ -62,7 +62,7 @@ class DelayedProduce(delayMs: Long, @@ -62,7 +62,7 @@ class DelayedProduce(delayMs: Long,
lockOpt: Option[Lock] = None)
extends DelayedOperation(delayMs, lockOpt) {
override lazy val logger = DelayedProduce.logger
override lazy val logger: Logger = DelayedProduce.logger
// first update the acks pending variable according to the error code
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>

32
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -83,7 +83,7 @@ object Defaults { @@ -83,7 +83,7 @@ object Defaults {
val BrokerHeartbeatIntervalMs = 2000
val BrokerSessionTimeoutMs = 9000
val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1);
val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1)
val MetadataMaxIdleIntervalMs = 500
val MetadataMaxRetentionBytes = 100 * 1024 * 1024
val DeleteTopicEnable = true
@ -224,7 +224,7 @@ object Defaults { @@ -224,7 +224,7 @@ object Defaults {
val MetricNumSamples = 2
val MetricSampleWindowMs = 30000
val MetricReporterClasses = ""
val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString
val AutoIncludeJmxReporter = true
@ -241,7 +241,7 @@ object Defaults { @@ -241,7 +241,7 @@ object Defaults {
val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
val SslEndpointIdentificationAlgorithm = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
val SslClientAuthentication = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT)
val SslClientAuthenticationValidValues = SslClientAuth.VALUES.asScala.map(v => v.toString().toLowerCase(Locale.ROOT)).asJava.toArray(new Array[String](0))
val SslClientAuthenticationValidValues = SslClientAuth.VALUES.asScala.map(v => v.toString.toLowerCase(Locale.ROOT)).asJava.toArray(new Array[String](0))
val SslPrincipalMappingRules = BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES
/** ********* General Security configuration ***********/
@ -754,7 +754,7 @@ object KafkaConfig { @@ -754,7 +754,7 @@ object KafkaConfig {
"maximum bytes limit is reached."
val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often the active " +
"controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}";
s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}"
val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the controller. This is required " +
"if running in KRaft mode. When communicating with the controller quorum, the broker will always use the first listener in this list.\n " +
"Note: The ZooKeeper-based controller should not set this configuration."
@ -886,7 +886,7 @@ object KafkaConfig { @@ -886,7 +886,7 @@ object KafkaConfig {
val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone message markers for log compacted topics. This setting also gives a bound " +
"on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise " +
"tombstones messages may be collected before a consumer completes their scan).";
"tombstones messages may be collected before a consumer completes their scan)."
val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted."
val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
@ -915,13 +915,13 @@ object KafkaConfig { @@ -915,13 +915,13 @@ object KafkaConfig {
"broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."
val LogMessageTimestampAfterMaxMsDoc = "This configuration sets the allowable timestamp difference between the " +
"message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " +
"timestamp, with the maximum allowable difference determined by the value set in this configuration. " +
"If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " +
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.";
"this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server."
@ -939,7 +939,7 @@ object KafkaConfig { @@ -939,7 +939,7 @@ object KafkaConfig {
"implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " +
"implement the <code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface."
val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC;
val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels."
@ -1949,7 +1949,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami @@ -1949,7 +1949,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// A user-supplied IBP was given
val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
if (!configuredVersion.isKRaftSupported) {
throw new ConfigException(s"A non-KRaft version ${interBrokerProtocolVersionString} given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " +
throw new ConfigException(s"A non-KRaft version $interBrokerProtocolVersionString given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " +
s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
} else {
warn(s"${KafkaConfig.InterBrokerProtocolVersionProp} is deprecated in KRaft mode as of 3.3 and will only " +
@ -1968,7 +1968,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami @@ -1968,7 +1968,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp)
/** ********* Feature configuration ***********/
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported()
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported
/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
@ -2038,12 +2038,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami @@ -2038,12 +2038,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName }
def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol }
def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled()
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled
/** ********* DelegationToken Configuration **************/
val delegationTokenSecretKey = Option(getPassword(KafkaConfig.DelegationTokenSecretKeyProp))
.getOrElse(getPassword(KafkaConfig.DelegationTokenSecretKeyAliasProp))
val tokenAuthEnabled = (delegationTokenSecretKey != null && !delegationTokenSecretKey.value.isEmpty)
val tokenAuthEnabled = delegationTokenSecretKey != null && delegationTokenSecretKey.value.nonEmpty
val delegationTokenMaxLifeMs = getLong(KafkaConfig.DelegationTokenMaxLifeTimeProp)
val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
@ -2222,7 +2222,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami @@ -2222,7 +2222,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
def usesTopicId: Boolean =
usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported()
usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported
val isRemoteLogStorageSystemEnabled: lang.Boolean = getBoolean(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP)
@ -2310,7 +2310,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami @@ -2310,7 +2310,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
require(advertisedListenerNames.nonEmpty,
"There must be at least one advertised listener." + (
if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in ${ControllerListenerNamesProp}?" else ""))
if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in $ControllerListenerNamesProp?" else ""))
}
if (processRoles == Set(BrokerRole)) {
// KRaft broker-only
@ -2434,11 +2434,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami @@ -2434,11 +2434,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")
if (maxConnectionsPerIp == 0)
require(!maxConnectionsPerIpOverrides.isEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" +
require(maxConnectionsPerIpOverrides.nonEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" +
s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.")
val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address => Utils.validHostPattern(address))
if (!invalidAddresses.isEmpty)
if (invalidAddresses.nonEmpty)
throw new IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp} contains invalid addresses : ${invalidAddresses.mkString(",")}")
if (connectionsMaxIdleMs >= 0)

4
core/src/main/scala/kafka/server/KafkaRaftServer.scala

@ -120,10 +120,10 @@ object KafkaRaftServer { @@ -120,10 +120,10 @@ object KafkaRaftServer {
sealed trait ProcessRole
case object BrokerRole extends ProcessRole {
override def toString(): String = "broker"
override def toString: String = "broker"
}
case object ControllerRole extends ProcessRole {
override def toString(): String = "controller"
override def toString: String = "controller"
}
/**

2
core/src/main/scala/kafka/server/PartitionMetadataFile.scala

@ -117,7 +117,7 @@ class PartitionMetadataFile(val file: File, @@ -117,7 +117,7 @@ class PartitionMetadataFile(val file: File,
try {
writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
writer.flush()
fileOutputStream.getFD().sync()
fileOutputStream.getFD.sync()
} finally {
writer.close()
}

2
core/src/main/scala/kafka/server/metadata/AclPublisher.scala

@ -35,7 +35,7 @@ class AclPublisher( @@ -35,7 +35,7 @@ class AclPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
override def name(): String = s"AclPublisher ${nodeType} id=${nodeId}"
override def name(): String = s"AclPublisher $nodeType id=$nodeId"
var completedInitialLoad = false

2
core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala

@ -136,7 +136,7 @@ final class BrokerServerMetrics private ( @@ -136,7 +136,7 @@ final class BrokerServerMetrics private (
}
final object BrokerServerMetrics {
object BrokerServerMetrics {
private val metricGroupName = "broker-metadata-metrics"
private def addMetric[T](metrics: Metrics, name: MetricName)(func: Long => T): Unit = {

6
core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala

@ -35,7 +35,7 @@ class DelegationTokenPublisher( @@ -35,7 +35,7 @@ class DelegationTokenPublisher(
var _firstPublish = true
override def name(): String = s"DelegationTokenPublisher ${nodeType} id=${conf.nodeId}"
override def name(): String = s"DelegationTokenPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
@ -58,7 +58,7 @@ class DelegationTokenPublisher( @@ -58,7 +58,7 @@ class DelegationTokenPublisher(
if (_firstPublish) {
// Initialize the tokenCache with the Image
Option(newImage.delegationTokens()).foreach { delegationTokenImage =>
delegationTokenImage.tokens().forEach { (tokenId, delegationTokenData) =>
delegationTokenImage.tokens().forEach { (_, delegationTokenData) =>
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation()))
}
}
@ -77,7 +77,7 @@ class DelegationTokenPublisher( @@ -77,7 +77,7 @@ class DelegationTokenPublisher(
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
s"publishing DelegationToken changes from ${deltaName}", t)
s"publishing DelegationToken changes from $deltaName", t)
}
}
}

4
core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala

@ -32,7 +32,7 @@ class DynamicClientQuotaPublisher( @@ -32,7 +32,7 @@ class DynamicClientQuotaPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
override def name(): String = s"DynamicClientQuotaPublisher ${nodeType} id=${conf.nodeId}"
override def name(): String = s"DynamicClientQuotaPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
@ -53,7 +53,7 @@ class DynamicClientQuotaPublisher( @@ -53,7 +53,7 @@ class DynamicClientQuotaPublisher(
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
s"publishing dynamic client quota changes from ${deltaName}", t)
s"publishing dynamic client quota changes from $deltaName", t)
}
}
}

10
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala

@ -35,7 +35,7 @@ class DynamicConfigPublisher( @@ -35,7 +35,7 @@ class DynamicConfigPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
override def name(): String = s"DynamicConfigPublisher ${nodeType} id=${conf.nodeId}"
override def name(): String = s"DynamicConfigPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
@ -66,7 +66,7 @@ class DynamicConfigPublisher( @@ -66,7 +66,7 @@ class DynamicConfigPublisher(
} catch {
case t: Throwable => faultHandler.handleFault("Error updating topic " +
s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
s"in ${deltaName}", t)
s"in $deltaName", t)
}
)
case BROKER =>
@ -81,7 +81,7 @@ class DynamicConfigPublisher( @@ -81,7 +81,7 @@ class DynamicConfigPublisher(
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
s"in ${deltaName}", t)
s"in $deltaName", t)
}
} else if (resource.name() == conf.nodeId.toString) {
try {
@ -97,7 +97,7 @@ class DynamicConfigPublisher( @@ -97,7 +97,7 @@ class DynamicConfigPublisher(
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"node with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
s"in ${deltaName}", t)
s"in $deltaName", t)
}
}
)
@ -107,7 +107,7 @@ class DynamicConfigPublisher( @@ -107,7 +107,7 @@ class DynamicConfigPublisher(
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
s"publishing dynamic configuration changes from ${deltaName}", t)
s"publishing dynamic configuration changes from $deltaName", t)
}
}

4
core/src/main/scala/kafka/server/metadata/ScramPublisher.scala

@ -33,7 +33,7 @@ class ScramPublisher( @@ -33,7 +33,7 @@ class ScramPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
override def name(): String = s"ScramPublisher ${nodeType} id=${conf.nodeId}"
override def name(): String = s"ScramPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
@ -65,7 +65,7 @@ class ScramPublisher( @@ -65,7 +65,7 @@ class ScramPublisher(
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
s"publishing SCRAM changes from ${deltaName}", t)
s"publishing SCRAM changes from $deltaName", t)
}
}
}

6
core/src/main/scala/kafka/utils/CoreUtils.scala

@ -94,7 +94,7 @@ object CoreUtils { @@ -94,7 +94,7 @@ object CoreUtils {
*/
def registerMBean(mbean: Object, name: String): Boolean = {
try {
val mbs = ManagementFactory.getPlatformMBeanServer()
val mbs = ManagementFactory.getPlatformMBeanServer
mbs synchronized {
val objName = new ObjectName(name)
if (mbs.isRegistered(objName))
@ -141,7 +141,7 @@ object CoreUtils { @@ -141,7 +141,7 @@ object CoreUtils {
* Create an instance of the class with the given class name
*/
def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader()).asInstanceOf[Class[T]]
val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader).asInstanceOf[Class[T]]
val constructor = klass.getConstructor(args.map(_.getClass): _*)
constructor.newInstance(args: _*)
}
@ -173,7 +173,7 @@ object CoreUtils { @@ -173,7 +173,7 @@ object CoreUtils {
}
def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
listenerListToEndPoints(listeners, securityProtocolMap, true)
listenerListToEndPoints(listeners, securityProtocolMap, requireDistinctPorts = true)
}
def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {

6
core/src/main/scala/kafka/utils/Mx4jLoader.scala

@ -33,15 +33,15 @@ import javax.management.ObjectName @@ -33,15 +33,15 @@ import javax.management.ObjectName
object Mx4jLoader extends Logging {
def maybeLoad(): Boolean = {
val props = new VerifiableProperties(System.getProperties())
if (!props.getBoolean("kafka_mx4jenable", false))
val props = new VerifiableProperties(System.getProperties)
if (!props.getBoolean("kafka_mx4jenable", default = false))
return false
val address = props.getString("mx4jaddress", "0.0.0.0")
val port = props.getInt("mx4jport", 8082)
try {
debug("Will try to load MX4j now, if it's in the classpath")
val mbs = ManagementFactory.getPlatformMBeanServer()
val mbs = ManagementFactory.getPlatformMBeanServer
val processorName = new ObjectName("Server:name=XSLTProcessor")
val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor")

2
core/src/main/scala/kafka/utils/PasswordEncoder.scala

@ -78,7 +78,7 @@ class NoOpPasswordEncoder extends PasswordEncoder { @@ -78,7 +78,7 @@ class NoOpPasswordEncoder extends PasswordEncoder {
* @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
* @param iterations Iteration count used for encoding.
*
* The provided `keyFactoryAlgorithm`, 'cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
* The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
* The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
*
*/

4
core/src/main/scala/kafka/utils/Throttler.scala

@ -54,7 +54,7 @@ class Throttler(@volatile var desiredRatePerSec: Double, @@ -54,7 +54,7 @@ class Throttler(@volatile var desiredRatePerSec: Double,
def maybeThrottle(observed: Double): Unit = {
val msPerSec = TimeUnit.SECONDS.toMillis(1)
val nsPerSec = TimeUnit.SECONDS.toNanos(1)
val currentDesiredRatePerSec = desiredRatePerSec;
val currentDesiredRatePerSec = desiredRatePerSec
meter.mark(observed.toLong)
lock synchronized {
@ -83,7 +83,7 @@ class Throttler(@volatile var desiredRatePerSec: Double, @@ -83,7 +83,7 @@ class Throttler(@volatile var desiredRatePerSec: Double,
}
def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
desiredRatePerSec = updatedDesiredRatePerSec;
desiredRatePerSec = updatedDesiredRatePerSec
}
}

2
core/src/main/scala/kafka/utils/ToolsUtils.scala

@ -32,7 +32,7 @@ object ToolsUtils { @@ -32,7 +32,7 @@ object ToolsUtils {
val validHostPort = hostPorts.filter { hostPortData =>
org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
}
val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length
val isValid = !validHostPort.isEmpty && validHostPort.length == hostPorts.length
if (!isValid)
CommandLineUtils.printUsageAndExit(parser, "Please provide valid host:port like host1:9091,host2:9092\n ")
}

2
core/src/main/scala/kafka/utils/VerifiableProperties.scala

@ -161,7 +161,7 @@ class VerifiableProperties(val props: Properties) extends Logging { @@ -161,7 +161,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
}
}
def getBoolean(name: String) = getString(name).toBoolean
def getBoolean(name: String): Boolean = getString(name).toBoolean
/**
* Get a string property, or, if no such property is defined, return the given default value

Loading…
Cancel
Save