Browse Source

MINOR: Update to Scala 2.13.12 (#14430)

It offers a quickfix action for certain errors, includes a number of bug fixes and it
introduces a new warning by default (https://github.com/scala/scala/pull/10462).

In addition to the scala version bump, we also fix the new compiler warnings and
bump the scalafmt version (the previous version failed with the new scala version).

Release notes: https://github.com/scala/scala/releases/tag/v2.13.12

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
pull/14433/head
Ismael Juma 1 year ago committed by GitHub
parent
commit
7ba6d7a0b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      LICENSE-binary
  2. 2
      bin/kafka-run-class.sh
  3. 2
      bin/windows/kafka-run-class.bat
  4. 2
      checkstyle/.scalafmt.conf
  5. 4
      core/src/main/scala/kafka/log/LogCleaner.scala
  6. 7
      core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
  7. 4
      core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
  8. 2
      core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
  9. 4
      core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  10. 12
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  11. 2
      gradle.properties
  12. 4
      gradle/dependencies.gradle

4
LICENSE-binary

@ -255,9 +255,9 @@ reflections-0.10.2 @@ -255,9 +255,9 @@ reflections-0.10.2
reload4j-1.2.25
rocksdbjni-7.9.2
scala-collection-compat_2.13-2.10.0
scala-library-2.13.11
scala-library-2.13.12
scala-logging_2.13-3.9.4
scala-reflect-2.13.11
scala-reflect-2.13.12
scala-java8-compat_2.13-1.0.2
snappy-java-1.1.10.3
swagger-annotations-2.2.8

2
bin/kafka-run-class.sh

@ -48,7 +48,7 @@ should_include_file() { @@ -48,7 +48,7 @@ should_include_file() {
base_dir=$(dirname $0)/..
if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.11
SCALA_VERSION=2.13.12
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi

2
bin/windows/kafka-run-class.bat

@ -27,7 +27,7 @@ set BASE_DIR=%CD% @@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd
IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.13.11
set SCALA_VERSION=2.13.12
)
IF ["%SCALA_BINARY_VERSION%"] EQU [""] (

2
checkstyle/.scalafmt.conf

@ -12,7 +12,7 @@ @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version = 3.5.9
version = 3.7.14
runner.dialect = scala213
docstrings.style = Asterisk
docstrings.wrap = false

4
core/src/main/scala/kafka/log/LogCleaner.scala

@ -747,7 +747,7 @@ private[log] class Cleaner(val id: Int, @@ -747,7 +747,7 @@ private[log] class Cleaner(val id: Int,
val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
if (batch.isControlBatch)
discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= currentTime
discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= this.currentTime
else
discardBatchRecords = canDiscardBatch
@ -784,7 +784,7 @@ private[log] class Cleaner(val id: Int, @@ -784,7 +784,7 @@ private[log] class Cleaner(val id: Int,
else if (batch.isControlBatch)
true
else
Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = currentTime)
Cleaner.this.shouldRetainRecord(map, retainLegacyDeletesAndTxnMarkers, batch, record, stats, currentTime = this.currentTime)
}
}

7
core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala

@ -326,10 +326,9 @@ object ReplicationQuotasTestRig { @@ -326,10 +326,9 @@ object ReplicationQuotasTestRig {
def append(message: String): Unit = {
val stream = Files.newOutputStream(log.toPath, StandardOpenOption.CREATE, StandardOpenOption.APPEND)
new PrintWriter(stream) {
append(message)
close
}
val writer = new PrintWriter(stream)
writer.append(message)
writer.close()
}
def path(): String = {

4
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala

@ -149,7 +149,7 @@ class LogLoaderTest { @@ -149,7 +149,7 @@ class LogLoaderTest {
val segments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time)
val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time,
logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
leaderEpochCache, producerStateManager)
@ -158,7 +158,7 @@ class LogLoaderTest { @@ -158,7 +158,7 @@ class LogLoaderTest {
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
logDirFailureChannel)
new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
producerIdExpirationCheckIntervalMs, leaderEpochCache,
this.producerIdExpirationCheckIntervalMs, leaderEpochCache,
producerStateManager, None, true)
}
}

2
core/src/test/scala/unit/kafka/log/TimeIndexTest.scala

@ -117,7 +117,7 @@ class TimeIndexTest { @@ -117,7 +117,7 @@ class TimeIndexTest {
idx = new TimeIndex(idx.file, baseOffset, maxEntries * 12) {
override def lastEntry = {
val superLastEntry = super.lastEntry
val offset = if (shouldCorruptOffset) baseOffset - 1 else superLastEntry.offset
val offset = if (shouldCorruptOffset) this.baseOffset - 1 else superLastEntry.offset
val timestamp = if (shouldCorruptTimestamp) firstEntry.timestamp - 1 else superLastEntry.timestamp
new TimestampOffset(timestamp, offset)
}

4
core/src/test/scala/unit/kafka/network/SocketServerTest.scala

@ -889,7 +889,7 @@ class SocketServerTest { @@ -889,7 +889,7 @@ class SocketServerTest {
// except the Acceptor overriding a method to inject the exception
override protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, false, requestChannel, serverMetrics, credentialProvider, new LogContext(), MemoryPool.NONE, apiVersionManager) {
new DataPlaneAcceptor(this, endPoint, this.config, nodeId, connectionQuotas, time, false, requestChannel, serverMetrics, this.credentialProvider, new LogContext(), MemoryPool.NONE, this.apiVersionManager) {
override protected def configureAcceptedSocketChannel(socketChannel: SocketChannel): Unit = {
assertEquals(1, connectionQuotas.get(socketChannel.socket.getInetAddress))
throw new IOException("test injected IOException")
@ -2149,7 +2149,7 @@ class SocketServerTest { @@ -2149,7 +2149,7 @@ class SocketServerTest {
) {
override def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel) : DataPlaneAcceptor = {
new TestableAcceptor(this, endPoint, config, 0, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, new LogContext, MemoryPool.NONE, apiVersionManager, connectionQueueSize)
new TestableAcceptor(this, endPoint, this.config, 0, connectionQuotas, time, isPrivilegedListener, requestChannel, this.metrics, this.credentialProvider, new LogContext, MemoryPool.NONE, this.apiVersionManager, connectionQueueSize)
}
def testableSelector: TestableSelector =

12
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -2800,16 +2800,16 @@ class ReplicaManagerTest { @@ -2800,16 +2800,16 @@ class ReplicaManagerTest {
threadNamePrefix: Option[String],
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
val rm = this
new ReplicaFetcherManager(config, rm, metrics, time, threadNamePrefix, replicationQuotaManager, () => metadataCache.metadataVersion(), () => 1) {
new ReplicaFetcherManager(this.config, rm, metrics, time, threadNamePrefix, replicationQuotaManager, () => this.metadataCache.metadataVersion(), () => 1) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " +
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${rm.config.brokerId}, leaderId=${sourceBroker.id}, " +
s"fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config,
rm, quotaManager.follower, () => config.interBrokerProtocolVersion, () => 1)
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, config, failedPartitions, rm,
quotaManager.follower, logContext.logPrefix, () => config.interBrokerProtocolVersion) {
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, rm.config,
rm, quotaManager.follower, () => rm.config.interBrokerProtocolVersion, () => 1)
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, rm.config, failedPartitions, rm,
quotaManager.follower, logContext.logPrefix, () => rm.config.interBrokerProtocolVersion) {
override def doWork(): Unit = {
// In case the thread starts before the partition is added by AbstractFetcherManager,
// add it here (it's a no-op if already added)

2
gradle.properties

@ -24,7 +24,7 @@ group=org.apache.kafka @@ -24,7 +24,7 @@ group=org.apache.kafka
# - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
# - streams/quickstart/java/pom.xml
version=3.7.0-SNAPSHOT
scalaVersion=2.13.11
scalaVersion=2.13.12
task=build
org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
org.gradle.parallel=true

4
gradle/dependencies.gradle

@ -28,7 +28,7 @@ ext { @@ -28,7 +28,7 @@ ext {
// Add Scala version
def defaultScala212Version = '2.12.18'
def defaultScala213Version = '2.13.11'
def defaultScala213Version = '2.13.12'
if (hasProperty('scalaVersion')) {
if (scalaVersion == '2.12') {
versions["scala"] = defaultScala212Version
@ -152,7 +152,7 @@ versions += [ @@ -152,7 +152,7 @@ versions += [
// When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now
// has the version field as mandatory in its configuration, see
// https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
scalafmt: "3.5.9",
scalafmt: "3.7.14",
scalaJava8Compat : "1.0.2",
scoverage: "1.9.3",
slf4j: "1.7.36",

Loading…
Cancel
Save