From 9d04c7a0459ff4e13d9385427eb244a47a1071da Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 17 Oct 2023 12:04:14 +0200 Subject: [PATCH] MINOR: Various Scala cleanups in core (#14558) Reviewers: Ismael Juma --- .../kafka/common/UnknownCodecException.scala | 26 --------------- .../kafka/server/AbstractFetcherThread.scala | 11 ++++--- .../kafka/server/BrokerLifecycleManager.scala | 4 +-- .../server/ClientRequestQuotaManager.scala | 5 ++- .../ControllerConfigurationValidator.scala | 10 +++--- .../ControllerMutationQuotaManager.scala | 1 - .../ControllerRegistrationManager.scala | 8 ++--- .../scala/kafka/server/DelayedFetch.scala | 6 ++-- .../scala/kafka/server/DelayedOperation.scala | 2 +- .../scala/kafka/server/DelayedProduce.scala | 4 +-- .../main/scala/kafka/server/KafkaConfig.scala | 32 +++++++++---------- .../scala/kafka/server/KafkaRaftServer.scala | 4 +-- .../kafka/server/PartitionMetadataFile.scala | 2 +- .../kafka/server/metadata/AclPublisher.scala | 2 +- .../server/metadata/BrokerServerMetrics.scala | 2 +- .../metadata/DelegationTokenPublisher.scala | 6 ++-- .../DynamicClientQuotaPublisher.scala | 4 +-- .../metadata/DynamicConfigPublisher.scala | 10 +++--- .../server/metadata/ScramPublisher.scala | 4 +-- .../main/scala/kafka/utils/CoreUtils.scala | 6 ++-- .../main/scala/kafka/utils/Mx4jLoader.scala | 6 ++-- .../scala/kafka/utils/PasswordEncoder.scala | 2 +- .../main/scala/kafka/utils/Throttler.scala | 4 +-- .../main/scala/kafka/utils/ToolsUtils.scala | 2 +- .../kafka/utils/VerifiableProperties.scala | 2 +- 25 files changed, 70 insertions(+), 95 deletions(-) delete mode 100644 core/src/main/scala/kafka/common/UnknownCodecException.scala diff --git a/core/src/main/scala/kafka/common/UnknownCodecException.scala b/core/src/main/scala/kafka/common/UnknownCodecException.scala deleted file mode 100644 index 7e669019c32..00000000000 --- a/core/src/main/scala/kafka/common/UnknownCodecException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Indicates the client has requested a range no longer available on the server - */ -class UnknownCodecException(message: String) extends RuntimeException(message) { - def this() = this(null) -} - diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 450fcfea461..935599048da 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -17,6 +17,7 @@ package kafka.server +import com.yammer.metrics.core.Meter import kafka.common.ClientIdAndBroker import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.utils.CoreUtils.inLock @@ -759,7 +760,7 @@ abstract class AbstractFetcherThread(name: String, leaderEpochInRequest: Optional[Integer], fetchPartitionData: PartitionData): Boolean = { try { - val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitionData); + val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitionData) // TODO: use fetchTierStateMachine.maybeAdvanceState when implementing async tiering logic in KAFKA-13560 @@ -879,7 +880,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) { lagVal.set(newLag) } - def lag = lagVal.get + def lag: Long = lagVal.get def unregister(): Unit = { metricsGroup.removeMetric(FetcherMetrics.ConsumerLag, tags) @@ -909,13 +910,13 @@ class FetcherLagStats(metricId: ClientIdAndBroker) { class FetcherStats(metricId: ClientIdAndBroker) { private val metricsGroup = new KafkaMetricsGroup(this.getClass) - val tags = Map("clientId" -> metricId.clientId, + val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId, "brokerHost" -> metricId.brokerHost, "brokerPort" -> metricId.brokerPort.toString).asJava - val requestRate = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags) + val requestRate: Meter = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags) - val byteRate = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes", TimeUnit.SECONDS, tags) + val byteRate: Meter = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes", TimeUnit.SECONDS, tags) def unregister(): Unit = { metricsGroup.removeMetric(FetcherMetrics.RequestsPerSec, tags) diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala index c17fe7c0c38..10074d4a5f5 100644 --- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala +++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala @@ -252,7 +252,7 @@ class BrokerLifecycleManager( * Start shutting down the BrokerLifecycleManager, but do not block. */ def beginShutdown(): Unit = { - eventQueue.beginShutdown("beginShutdown"); + eventQueue.beginShutdown("beginShutdown") } /** @@ -483,7 +483,7 @@ class BrokerLifecycleManager( override def run(): Unit = { if (!initialRegistrationSucceeded) { error("Shutting down because we were unable to register with the controller quorum.") - eventQueue.beginShutdown("registrationTimeout"); + eventQueue.beginShutdown("registrationTimeout") } } } diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala index 6e57d97bc3e..d330210b9d5 100644 --- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala @@ -28,11 +28,10 @@ import org.apache.kafka.server.quota.ClientQuotaCallback import scala.jdk.CollectionConverters._ object ClientRequestQuotaManager { - val QuotaRequestPercentDefault = Int.MaxValue.toDouble - val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1) + val NanosToPercentagePerSecond: Double = 100.0 / TimeUnit.SECONDS.toNanos(1) // Since exemptSensor is for all clients and has a constant name, we do not expire exemptSensor and only // create once. - val DefaultInactiveExemptSensorExpirationTimeSeconds = Long.MaxValue + val DefaultInactiveExemptSensorExpirationTimeSeconds: Long = Long.MaxValue private val ExemptSensorName = "exempt-" + QuotaType.Request } diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index ccdd0ac31af..e095ecdc051 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -46,7 +46,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu private def validateTopicName( name: String ): Unit = { - if (name.isEmpty()) { + if (name.isEmpty) { throw new InvalidRequestException("Default topic resources are not allowed.") } Topic.validate(name) @@ -55,7 +55,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu private def validateBrokerName( name: String ): Unit = { - if (!name.isEmpty()) { + if (name.nonEmpty) { val brokerId = try { Integer.valueOf(name) } catch { @@ -96,10 +96,10 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu val properties = new Properties() val nullTopicConfigs = new mutable.ArrayBuffer[String]() config.entrySet().forEach(e => { - if (e.getValue() == null) { - nullTopicConfigs += e.getKey() + if (e.getValue == null) { + nullTopicConfigs += e.getKey } else { - properties.setProperty(e.getKey(), e.getValue()) + properties.setProperty(e.getKey, e.getValue) } }) if (nullTopicConfigs.nonEmpty) { diff --git a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala index f011a6b3663..e21c4699bcd 100644 --- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala @@ -132,7 +132,6 @@ class PermissiveControllerMutationQuota(private val time: Time, } object ControllerMutationQuotaManager { - val QuotaControllerMutationDefault = Int.MaxValue.toDouble /** * This calculates the amount of time needed to bring the TokenBucket within quota diff --git a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala index c5868c05732..ae717bce624 100644 --- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala +++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala @@ -143,7 +143,7 @@ class ControllerRegistrationManager( * Start shutting down the ControllerRegistrationManager, but do not block. */ def beginShutdown(): Unit = { - eventQueue.beginShutdown("beginShutdown"); + eventQueue.beginShutdown("beginShutdown") } /** @@ -206,7 +206,7 @@ class ControllerRegistrationManager( info("maybeSendControllerRegistration: cannot register yet because the metadata version is " + s"still $metadataVersion, which does not support KIP-919 controller registration.") } else if (pendingRpc) { - info("maybeSendControllerRegistration: waiting for the previous RPC to complete."); + info("maybeSendControllerRegistration: waiting for the previous RPC to complete.") } else { sendControllerRegistration() } @@ -224,7 +224,7 @@ class ControllerRegistrationManager( setControllerId(nodeId). setFeatures(features). setIncarnationId(incarnationId). - setListeners(listenerInfo.toControllerRegistrationRequest()) + setListeners(listenerInfo.toControllerRegistrationRequest) info(s"sendControllerRegistration: attempting to send $data") _channelManager.sendRequest(new ControllerRegistrationRequest.Builder(data), new RegistrationResponseHandler()) @@ -274,7 +274,7 @@ class ControllerRegistrationManager( } private def scheduleNextCommunication(intervalMs: Long): Unit = { - trace(s"Scheduling next communication at ${intervalMs} ms from now.") + trace(s"Scheduling next communication at $intervalMs ms from now.") val deadlineNs = time.nanoseconds() + MILLISECONDS.toNanos(intervalMs) eventQueue.scheduleDeferred("communication", new DeadlineFunction(deadlineNs), diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 9ce6082e76c..f8b60b6071d 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -17,6 +17,8 @@ package kafka.server +import com.yammer.metrics.core.Meter + import java.util.concurrent.TimeUnit import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.errors._ @@ -184,7 +186,7 @@ class DelayedFetch( object DelayedFetchMetrics { private val metricsGroup = new KafkaMetricsGroup(DelayedFetchMetrics.getClass) private val FetcherTypeKey = "fetcherType" - val followerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava) - val consumerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava) + val followerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava) + val consumerExpiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava) } diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 48a17442e0e..629199148a8 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -167,7 +167,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri * Return all the current watcher lists, * note that the returned watchers may be removed from the list by other threads */ - def allWatchers = { + def allWatchers: Iterable[Watchers] = { watchersByKey.values } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 2f1261ada23..7a21a86260c 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -34,7 +34,7 @@ import scala.jdk.CollectionConverters._ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) { @volatile var acksPending = false - override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " + + override def toString: String = s"[acksPending: $acksPending, error: ${responseStatus.error.code}, " + s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]" } @@ -62,7 +62,7 @@ class DelayedProduce(delayMs: Long, lockOpt: Option[Lock] = None) extends DelayedOperation(delayMs, lockOpt) { - override lazy val logger = DelayedProduce.logger + override lazy val logger: Logger = DelayedProduce.logger // first update the acks pending variable according to the error code produceMetadata.produceStatus.forKeyValue { (topicPartition, status) => diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 618faeee790..485d76f7661 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -83,7 +83,7 @@ object Defaults { val BrokerHeartbeatIntervalMs = 2000 val BrokerSessionTimeoutMs = 9000 val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024 - val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1); + val MetadataSnapshotMaxIntervalMs = TimeUnit.HOURS.toMillis(1) val MetadataMaxIdleIntervalMs = 500 val MetadataMaxRetentionBytes = 100 * 1024 * 1024 val DeleteTopicEnable = true @@ -224,7 +224,7 @@ object Defaults { val MetricNumSamples = 2 val MetricSampleWindowMs = 30000 val MetricReporterClasses = "" - val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString() + val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString val AutoIncludeJmxReporter = true @@ -241,7 +241,7 @@ object Defaults { val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM val SslEndpointIdentificationAlgorithm = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM val SslClientAuthentication = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT) - val SslClientAuthenticationValidValues = SslClientAuth.VALUES.asScala.map(v => v.toString().toLowerCase(Locale.ROOT)).asJava.toArray(new Array[String](0)) + val SslClientAuthenticationValidValues = SslClientAuth.VALUES.asScala.map(v => v.toString.toLowerCase(Locale.ROOT)).asJava.toArray(new Array[String](0)) val SslPrincipalMappingRules = BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES /** ********* General Security configuration ***********/ @@ -754,7 +754,7 @@ object KafkaConfig { "maximum bytes limit is reached." val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often the active " + "controller should write no-op records to the metadata partition. If the value is 0, no-op records " + - s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}"; + s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}" val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the controller. This is required " + "if running in KRaft mode. When communicating with the controller quorum, the broker will always use the first listener in this list.\n " + "Note: The ZooKeeper-based controller should not set this configuration." @@ -886,7 +886,7 @@ object KafkaConfig { val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size." val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone message markers for log compacted topics. This setting also gives a bound " + "on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise " + - "tombstones messages may be collected before a consumer completes their scan)."; + "tombstones messages may be collected before a consumer completes their scan)." val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted." val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted." val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index" @@ -915,13 +915,13 @@ object KafkaConfig { "broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's " + "timestamp, with the maximum allowable difference determined by the value set in this configuration. " + "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + - "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."; + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." val LogMessageTimestampAfterMaxMsDoc = "This configuration sets the allowable timestamp difference between the " + "message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's " + "timestamp, with the maximum allowable difference determined by the value set in this configuration. " + "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + - "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime."; + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server." @@ -939,7 +939,7 @@ object KafkaConfig { "implement the org.apache.kafka.server.policy.CreateTopicPolicy interface." val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " + "implement the org.apache.kafka.server.policy.AlterConfigPolicy interface." - val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC; + val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels." @@ -1949,7 +1949,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // A user-supplied IBP was given val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString) if (!configuredVersion.isKRaftSupported) { - throw new ConfigException(s"A non-KRaft version ${interBrokerProtocolVersionString} given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " + + throw new ConfigException(s"A non-KRaft version $interBrokerProtocolVersionString given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " + s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}") } else { warn(s"${KafkaConfig.InterBrokerProtocolVersionProp} is deprecated in KRaft mode as of 3.3 and will only " + @@ -1968,7 +1968,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp) /** ********* Feature configuration ***********/ - def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported() + def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported /** ********* Group coordinator configuration ***********/ val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp) @@ -2038,12 +2038,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName } def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol } def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp) - val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled() + val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled /** ********* DelegationToken Configuration **************/ val delegationTokenSecretKey = Option(getPassword(KafkaConfig.DelegationTokenSecretKeyProp)) .getOrElse(getPassword(KafkaConfig.DelegationTokenSecretKeyAliasProp)) - val tokenAuthEnabled = (delegationTokenSecretKey != null && !delegationTokenSecretKey.value.isEmpty) + val tokenAuthEnabled = delegationTokenSecretKey != null && delegationTokenSecretKey.value.nonEmpty val delegationTokenMaxLifeMs = getLong(KafkaConfig.DelegationTokenMaxLifeTimeProp) val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp) val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp) @@ -2222,7 +2222,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8 def usesTopicId: Boolean = - usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported() + usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported val isRemoteLogStorageSystemEnabled: lang.Boolean = getBoolean(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP) @@ -2310,7 +2310,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def validateAdvertisedListenersNonEmptyForBroker(): Unit = { require(advertisedListenerNames.nonEmpty, "There must be at least one advertised listener." + ( - if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in ${ControllerListenerNamesProp}?" else "")) + if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in $ControllerListenerNamesProp?" else "")) } if (processRoles == Set(BrokerRole)) { // KRaft broker-only @@ -2434,11 +2434,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}") if (maxConnectionsPerIp == 0) - require(!maxConnectionsPerIpOverrides.isEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" + + require(maxConnectionsPerIpOverrides.nonEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" + s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.") val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address => Utils.validHostPattern(address)) - if (!invalidAddresses.isEmpty) + if (invalidAddresses.nonEmpty) throw new IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp} contains invalid addresses : ${invalidAddresses.mkString(",")}") if (connectionsMaxIdleMs >= 0) diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 92d4274bcb6..c28cfc6f543 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -120,10 +120,10 @@ object KafkaRaftServer { sealed trait ProcessRole case object BrokerRole extends ProcessRole { - override def toString(): String = "broker" + override def toString: String = "broker" } case object ControllerRole extends ProcessRole { - override def toString(): String = "controller" + override def toString: String = "controller" } /** diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala index 97af1688482..ec4425de9e6 100644 --- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala +++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala @@ -117,7 +117,7 @@ class PartitionMetadataFile(val file: File, try { writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId))) writer.flush() - fileOutputStream.getFD().sync() + fileOutputStream.getFD.sync() } finally { writer.close() } diff --git a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala index 819fcc3d38d..c33bec98a67 100644 --- a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala @@ -35,7 +35,7 @@ class AclPublisher( ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { logIdent = s"[${name()}] " - override def name(): String = s"AclPublisher ${nodeType} id=${nodeId}" + override def name(): String = s"AclPublisher $nodeType id=$nodeId" var completedInitialLoad = false diff --git a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala index ff183324166..3e4f798abb4 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala @@ -136,7 +136,7 @@ final class BrokerServerMetrics private ( } -final object BrokerServerMetrics { +object BrokerServerMetrics { private val metricGroupName = "broker-metadata-metrics" private def addMetric[T](metrics: Metrics, name: MetricName)(func: Long => T): Unit = { diff --git a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala index 59f91970f6b..34e14442b4d 100644 --- a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala @@ -35,7 +35,7 @@ class DelegationTokenPublisher( var _firstPublish = true - override def name(): String = s"DelegationTokenPublisher ${nodeType} id=${conf.nodeId}" + override def name(): String = s"DelegationTokenPublisher $nodeType id=${conf.nodeId}" override def onMetadataUpdate( delta: MetadataDelta, @@ -58,7 +58,7 @@ class DelegationTokenPublisher( if (_firstPublish) { // Initialize the tokenCache with the Image Option(newImage.delegationTokens()).foreach { delegationTokenImage => - delegationTokenImage.tokens().forEach { (tokenId, delegationTokenData) => + delegationTokenImage.tokens().forEach { (_, delegationTokenData) => tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation())) } } @@ -77,7 +77,7 @@ class DelegationTokenPublisher( } } catch { case t: Throwable => faultHandler.handleFault("Uncaught exception while " + - s"publishing DelegationToken changes from ${deltaName}", t) + s"publishing DelegationToken changes from $deltaName", t) } } } diff --git a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala index 0ac93a46db1..94aaebf00a6 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala @@ -32,7 +32,7 @@ class DynamicClientQuotaPublisher( ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { logIdent = s"[${name()}] " - override def name(): String = s"DynamicClientQuotaPublisher ${nodeType} id=${conf.nodeId}" + override def name(): String = s"DynamicClientQuotaPublisher $nodeType id=${conf.nodeId}" override def onMetadataUpdate( delta: MetadataDelta, @@ -53,7 +53,7 @@ class DynamicClientQuotaPublisher( } } catch { case t: Throwable => faultHandler.handleFault("Uncaught exception while " + - s"publishing dynamic client quota changes from ${deltaName}", t) + s"publishing dynamic client quota changes from $deltaName", t) } } } diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala index b5db4e246b6..8f16d6c0513 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -35,7 +35,7 @@ class DynamicConfigPublisher( ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { logIdent = s"[${name()}] " - override def name(): String = s"DynamicConfigPublisher ${nodeType} id=${conf.nodeId}" + override def name(): String = s"DynamicConfigPublisher $nodeType id=${conf.nodeId}" override def onMetadataUpdate( delta: MetadataDelta, @@ -66,7 +66,7 @@ class DynamicConfigPublisher( } catch { case t: Throwable => faultHandler.handleFault("Error updating topic " + s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in ${deltaName}", t) + s"in $deltaName", t) } ) case BROKER => @@ -81,7 +81,7 @@ class DynamicConfigPublisher( } catch { case t: Throwable => faultHandler.handleFault("Error updating " + s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in ${deltaName}", t) + s"in $deltaName", t) } } else if (resource.name() == conf.nodeId.toString) { try { @@ -97,7 +97,7 @@ class DynamicConfigPublisher( } catch { case t: Throwable => faultHandler.handleFault("Error updating " + s"node with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in ${deltaName}", t) + s"in $deltaName", t) } } ) @@ -107,7 +107,7 @@ class DynamicConfigPublisher( } } catch { case t: Throwable => faultHandler.handleFault("Uncaught exception while " + - s"publishing dynamic configuration changes from ${deltaName}", t) + s"publishing dynamic configuration changes from $deltaName", t) } } diff --git a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala index 535ca6e8b57..bacac660e7b 100644 --- a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala @@ -33,7 +33,7 @@ class ScramPublisher( ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { logIdent = s"[${name()}] " - override def name(): String = s"ScramPublisher ${nodeType} id=${conf.nodeId}" + override def name(): String = s"ScramPublisher $nodeType id=${conf.nodeId}" override def onMetadataUpdate( delta: MetadataDelta, @@ -65,7 +65,7 @@ class ScramPublisher( } } catch { case t: Throwable => faultHandler.handleFault("Uncaught exception while " + - s"publishing SCRAM changes from ${deltaName}", t) + s"publishing SCRAM changes from $deltaName", t) } } } diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 9307a5d3996..88e9f8aa2a8 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -94,7 +94,7 @@ object CoreUtils { */ def registerMBean(mbean: Object, name: String): Boolean = { try { - val mbs = ManagementFactory.getPlatformMBeanServer() + val mbs = ManagementFactory.getPlatformMBeanServer mbs synchronized { val objName = new ObjectName(name) if (mbs.isRegistered(objName)) @@ -141,7 +141,7 @@ object CoreUtils { * Create an instance of the class with the given class name */ def createObject[T <: AnyRef](className: String, args: AnyRef*): T = { - val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader()).asInstanceOf[Class[T]] + val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader).asInstanceOf[Class[T]] val constructor = klass.getConstructor(args.map(_.getClass): _*) constructor.newInstance(args: _*) } @@ -173,7 +173,7 @@ object CoreUtils { } def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = { - listenerListToEndPoints(listeners, securityProtocolMap, true) + listenerListToEndPoints(listeners, securityProtocolMap, requireDistinctPorts = true) } def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = { diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala index e49f3a57f1e..5fbbebed475 100644 --- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -33,15 +33,15 @@ import javax.management.ObjectName object Mx4jLoader extends Logging { def maybeLoad(): Boolean = { - val props = new VerifiableProperties(System.getProperties()) - if (!props.getBoolean("kafka_mx4jenable", false)) + val props = new VerifiableProperties(System.getProperties) + if (!props.getBoolean("kafka_mx4jenable", default = false)) return false val address = props.getString("mx4jaddress", "0.0.0.0") val port = props.getInt("mx4jport", 8082) try { debug("Will try to load MX4j now, if it's in the classpath") - val mbs = ManagementFactory.getPlatformMBeanServer() + val mbs = ManagementFactory.getPlatformMBeanServer val processorName = new ObjectName("Server:name=XSLTProcessor") val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor") diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala index 1d89e3fe021..d4737be08ce 100644 --- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala +++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala @@ -78,7 +78,7 @@ class NoOpPasswordEncoder extends PasswordEncoder { * @param keyLength Key length used for encoding. This should be valid for the specified algorithms. * @param iterations Iteration count used for encoding. * - * The provided `keyFactoryAlgorithm`, 'cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords. + * The provided `keyFactoryAlgorithm`, `cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords. * The values used for encoding are stored along with the encoded password and the stored values are used for decoding. * */ diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala index 824c7dcfc28..1e77624a9fb 100644 --- a/core/src/main/scala/kafka/utils/Throttler.scala +++ b/core/src/main/scala/kafka/utils/Throttler.scala @@ -54,7 +54,7 @@ class Throttler(@volatile var desiredRatePerSec: Double, def maybeThrottle(observed: Double): Unit = { val msPerSec = TimeUnit.SECONDS.toMillis(1) val nsPerSec = TimeUnit.SECONDS.toNanos(1) - val currentDesiredRatePerSec = desiredRatePerSec; + val currentDesiredRatePerSec = desiredRatePerSec meter.mark(observed.toLong) lock synchronized { @@ -83,7 +83,7 @@ class Throttler(@volatile var desiredRatePerSec: Double, } def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = { - desiredRatePerSec = updatedDesiredRatePerSec; + desiredRatePerSec = updatedDesiredRatePerSec } } diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala index 10586317f65..8f3ae49b7aa 100644 --- a/core/src/main/scala/kafka/utils/ToolsUtils.scala +++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala @@ -32,7 +32,7 @@ object ToolsUtils { val validHostPort = hostPorts.filter { hostPortData => org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null } - val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length + val isValid = !validHostPort.isEmpty && validHostPort.length == hostPorts.length if (!isValid) CommandLineUtils.printUsageAndExit(parser, "Please provide valid host:port like host1:9091,host2:9092\n ") } diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index 462a7436bb1..54490e3dd65 100755 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -161,7 +161,7 @@ class VerifiableProperties(val props: Properties) extends Logging { } } - def getBoolean(name: String) = getString(name).toBoolean + def getBoolean(name: String): Boolean = getString(name).toBoolean /** * Get a string property, or, if no such property is defined, return the given default value