From 7ba6d7a0b439cd7be7918b77c6b08425d6b37526 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 24 Sep 2023 06:05:12 -0700 Subject: [PATCH] MINOR: Update to Scala 2.13.12 (#14430) It offers a quickfix action for certain errors, includes a number of bug fixes and it introduces a new warning by default (https://github.com/scala/scala/pull/10462). In addition to the scala version bump, we also fix the new compiler warnings and bump the scalafmt version (the previous version failed with the new scala version). Release notes: https://github.com/scala/scala/releases/tag/v2.13.12 Reviewers: Divij Vaidya , Satish Duggana --- LICENSE-binary | 4 ++-- bin/kafka-run-class.sh | 2 +- bin/windows/kafka-run-class.bat | 2 +- checkstyle/.scalafmt.conf | 4 ++-- core/src/main/scala/kafka/log/LogCleaner.scala | 4 ++-- .../scala/other/kafka/ReplicationQuotasTestRig.scala | 7 +++---- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 4 ++-- .../test/scala/unit/kafka/log/TimeIndexTest.scala | 2 +- .../scala/unit/kafka/network/SocketServerTest.scala | 4 ++-- .../scala/unit/kafka/server/ReplicaManagerTest.scala | 12 ++++++------ gradle.properties | 2 +- gradle/dependencies.gradle | 4 ++-- 12 files changed, 25 insertions(+), 26 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 73271b802f2..d453408a079 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -255,9 +255,9 @@ reflections-0.10.2 reload4j-1.2.25 rocksdbjni-7.9.2 scala-collection-compat_2.13-2.10.0 -scala-library-2.13.11 +scala-library-2.13.12 scala-logging_2.13-3.9.4 -scala-reflect-2.13.11 +scala-reflect-2.13.12 scala-java8-compat_2.13-1.0.2 snappy-java-1.1.10.3 swagger-annotations-2.2.8 diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 9ab96d7f2e1..8e66c49391d 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -48,7 +48,7 @@ should_include_file() { base_dir=$(dirname $0)/.. if [ -z "$SCALA_VERSION" ]; then - SCALA_VERSION=2.13.11 + SCALA_VERSION=2.13.12 if [[ -f "$base_dir/gradle.properties" ]]; then SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2` fi diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index 42903fba956..18310057f26 100755 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -27,7 +27,7 @@ set BASE_DIR=%CD% popd IF ["%SCALA_VERSION%"] EQU [""] ( - set SCALA_VERSION=2.13.11 + set SCALA_VERSION=2.13.12 ) IF ["%SCALA_BINARY_VERSION%"] EQU [""] ( diff --git a/checkstyle/.scalafmt.conf b/checkstyle/.scalafmt.conf index a6fae4ab32d..54533046741 100644 --- a/checkstyle/.scalafmt.conf +++ b/checkstyle/.scalafmt.conf @@ -12,11 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -version = 3.5.9 +version = 3.7.14 runner.dialect = scala213 docstrings.style = Asterisk docstrings.wrap = false maxColumn = 120 continuationIndent.defnSite = 2 assumeStandardLibraryStripMargin = true -rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers] \ No newline at end of file +rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers] diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index ff8a687b5ee..454cbaa2b35 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -747,7 +747,7 @@ private[log] class Cleaner(val id: Int, val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata) if (batch.isControlBatch) - discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime + discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= this.currentTime else discardBatchRecords = canDiscardBatch @@ -784,7 +784,7 @@ private[log] class Cleaner(val id: Int, else if (batch.isControlBatch) true else - Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = currentTime) + Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = this.currentTime) } } diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala index dddd33f2d54..584955de691 100644 --- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala +++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala @@ -326,10 +326,9 @@ object ReplicationQuotasTestRig { def append(message: String): Unit = { val stream = Files.newOutputStream(log.toPath, StandardOpenOption.CREATE, StandardOpenOption.APPEND) - new PrintWriter(stream) { - append(message) - close - } + val writer = new PrintWriter(stream) + writer.append(message) + writer.close() } def path(): String = { diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 13870904ca6..c2aa991e38e 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -149,7 +149,7 @@ class LogLoaderTest { val segments = new LogSegments(topicPartition) val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "") val producerStateManager = new ProducerStateManager(topicPartition, logDir, - maxTransactionTimeoutMs, producerStateManagerConfig, time) + this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time) val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time, logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint, leaderEpochCache, producerStateManager) @@ -158,7 +158,7 @@ class LogLoaderTest { offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition, logDirFailureChannel) new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats, - producerIdExpirationCheckIntervalMs, leaderEpochCache, + this.producerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager, None, true) } } diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala index 0ca0474b201..72bbe3cd202 100644 --- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala @@ -117,7 +117,7 @@ class TimeIndexTest { idx = new TimeIndex(idx.file, baseOffset, maxEntries * 12) { override def lastEntry = { val superLastEntry = super.lastEntry - val offset = if (shouldCorruptOffset) baseOffset - 1 else superLastEntry.offset + val offset = if (shouldCorruptOffset) this.baseOffset - 1 else superLastEntry.offset val timestamp = if (shouldCorruptTimestamp) firstEntry.timestamp - 1 else superLastEntry.timestamp new TimestampOffset(timestamp, offset) } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 63759fae871..de89d31e569 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -889,7 +889,7 @@ class SocketServerTest { // except the Acceptor overriding a method to inject the exception override protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = { - new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, false, requestChannel, serverMetrics, credentialProvider, new LogContext(), MemoryPool.NONE, apiVersionManager) { + new DataPlaneAcceptor(this, endPoint, this.config, nodeId, connectionQuotas, time, false, requestChannel, serverMetrics, this.credentialProvider, new LogContext(), MemoryPool.NONE, this.apiVersionManager) { override protected def configureAcceptedSocketChannel(socketChannel: SocketChannel): Unit = { assertEquals(1, connectionQuotas.get(socketChannel.socket.getInetAddress)) throw new IOException("test injected IOException") @@ -2149,7 +2149,7 @@ class SocketServerTest { ) { override def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel) : DataPlaneAcceptor = { - new TestableAcceptor(this, endPoint, config, 0, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, new LogContext, MemoryPool.NONE, apiVersionManager, connectionQueueSize) + new TestableAcceptor(this, endPoint, this.config, 0, connectionQuotas, time, isPrivilegedListener, requestChannel, this.metrics, this.credentialProvider, new LogContext, MemoryPool.NONE, this.apiVersionManager, connectionQueueSize) } def testableSelector: TestableSelector = diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 6f913208320..3663e023dfc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2800,16 +2800,16 @@ class ReplicaManagerTest { threadNamePrefix: Option[String], replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = { val rm = this - new ReplicaFetcherManager(config, rm, metrics, time, threadNamePrefix, replicationQuotaManager, () => metadataCache.metadataVersion(), () => 1) { + new ReplicaFetcherManager(this.config, rm, metrics, time, threadNamePrefix, replicationQuotaManager, () => this.metadataCache.metadataVersion(), () => 1) { override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = { - val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " + + val logContext = new LogContext(s"[ReplicaFetcher replicaId=${rm.config.brokerId}, leaderId=${sourceBroker.id}, " + s"fetcherId=$fetcherId] ") val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id) - val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config, - rm, quotaManager.follower, () => config.interBrokerProtocolVersion, () => 1) - new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, config, failedPartitions, rm, - quotaManager.follower, logContext.logPrefix, () => config.interBrokerProtocolVersion) { + val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, rm.config, + rm, quotaManager.follower, () => rm.config.interBrokerProtocolVersion, () => 1) + new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, rm.config, failedPartitions, rm, + quotaManager.follower, logContext.logPrefix, () => rm.config.interBrokerProtocolVersion) { override def doWork(): Unit = { // In case the thread starts before the partition is added by AbstractFetcherManager, // add it here (it's a no-op if already added) diff --git a/gradle.properties b/gradle.properties index a303a314199..06f12850326 100644 --- a/gradle.properties +++ b/gradle.properties @@ -24,7 +24,7 @@ group=org.apache.kafka # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml version=3.7.0-SNAPSHOT -scalaVersion=2.13.11 +scalaVersion=2.13.12 task=build org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC org.gradle.parallel=true diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 50e662a6ad9..7dc1a8d5980 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -28,7 +28,7 @@ ext { // Add Scala version def defaultScala212Version = '2.12.18' -def defaultScala213Version = '2.13.11' +def defaultScala213Version = '2.13.12' if (hasProperty('scalaVersion')) { if (scalaVersion == '2.12') { versions["scala"] = defaultScala212Version @@ -152,7 +152,7 @@ versions += [ // When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now // has the version field as mandatory in its configuration, see // https://github.com/scalameta/scalafmt/releases/tag/v3.1.0. - scalafmt: "3.5.9", + scalafmt: "3.7.14", scalaJava8Compat : "1.0.2", scoverage: "1.9.3", slf4j: "1.7.36",