Browse Source

MINOR: Enable a number of xlint scalac warnings

Update the code where possible to fix the warnings. The unused
warning introduced in Scala 2.12 is quite handy and provides
a reason to compile with Scala 2.12.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3464 from ijuma/scala-xlint
pull/3523/merge
Ismael Juma 7 years ago committed by Jason Gustafson
parent
commit
1685e7112c
  1. 22
      build.gradle
  2. 3
      core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
  3. 2
      core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
  4. 2
      core/src/main/scala/kafka/admin/TopicCommand.scala
  5. 2
      core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
  6. 2
      core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
  7. 1
      core/src/main/scala/kafka/api/FetchRequest.scala
  8. 2
      core/src/main/scala/kafka/api/TopicMetadataRequest.scala
  9. 2
      core/src/main/scala/kafka/cluster/Replica.scala
  10. 1
      core/src/main/scala/kafka/common/TopicAndPartition.scala
  11. 2
      core/src/main/scala/kafka/consumer/ConsumerConnector.scala
  12. 2
      core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
  13. 2
      core/src/main/scala/kafka/consumer/ConsumerIterator.scala
  14. 2
      core/src/main/scala/kafka/consumer/KafkaStream.scala
  15. 1
      core/src/main/scala/kafka/consumer/PartitionAssignor.scala
  16. 20
      core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  17. 2
      core/src/main/scala/kafka/controller/KafkaController.scala
  18. 10
      core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
  19. 2
      core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
  20. 2
      core/src/main/scala/kafka/javaapi/producer/Producer.scala
  21. 6
      core/src/main/scala/kafka/log/AbstractIndex.scala
  22. 2
      core/src/main/scala/kafka/log/LogCleanerManager.scala
  23. 5
      core/src/main/scala/kafka/log/OffsetIndex.scala
  24. 9
      core/src/main/scala/kafka/log/TimeIndex.scala
  25. 4
      core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
  26. 2
      core/src/main/scala/kafka/producer/Producer.scala
  27. 2
      core/src/main/scala/kafka/producer/async/EventHandler.scala
  28. 2
      core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
  29. 21
      core/src/main/scala/kafka/security/auth/Acl.scala
  30. 2
      core/src/main/scala/kafka/server/ClientQuotaManager.scala
  31. 4
      core/src/main/scala/kafka/utils/IteratorTemplate.scala
  32. 4
      core/src/main/scala/kafka/utils/KafkaScheduler.scala
  33. 2
      core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
  34. 2
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
  35. 2
      core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
  36. 28
      core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
  37. 2
      core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
  38. 2
      core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
  39. 2
      core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
  40. 2
      core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
  41. 2
      core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
  42. 13
      core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
  43. 4
      core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
  44. 12
      core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
  45. 2
      core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
  46. 3
      core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
  47. 3
      core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
  48. 4
      core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
  49. 2
      core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
  50. 2
      core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
  51. 4
      core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
  52. 2
      core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
  53. 4
      core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
  54. 2
      core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
  55. 2
      core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
  56. 2
      core/src/test/scala/unit/kafka/integration/FetcherTest.scala
  57. 2
      core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
  58. 2
      core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
  59. 2
      core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
  60. 22
      core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
  61. 14
      core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
  62. 3
      core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
  63. 2
      core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
  64. 2
      core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
  65. 4
      core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
  66. 2
      core/src/test/scala/unit/kafka/message/MessageTest.scala
  67. 2
      core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
  68. 2
      core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
  69. 2
      core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
  70. 2
      core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
  71. 6
      core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
  72. 2
      core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
  73. 2
      core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
  74. 11
      core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
  75. 2
      core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
  76. 2
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  77. 2
      core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
  78. 8
      core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
  79. 10
      core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
  80. 2
      core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
  81. 2
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  82. 8
      core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
  83. 16
      core/src/test/scala/unit/kafka/zk/ZKPathTest.scala

22
build.gradle

@ -309,9 +309,29 @@ subprojects { @@ -309,9 +309,29 @@ subprojects {
"-feature",
"-language:postfixOps",
"-language:implicitConversions",
"-language:existentials"
"-language:existentials",
"-Xlint:by-name-right-associative",
"-Xlint:delayedinit-select",
"-Xlint:doc-detached",
"-Xlint:missing-interpolator",
"-Xlint:nullary-override",
"-Xlint:nullary-unit",
"-Xlint:option-implicit",
"-Xlint:package-object-classes",
"-Xlint:poly-implicit-overload",
"-Xlint:private-shadow",
"-Xlint:stars-align",
"-Xlint:type-parameter-shadow",
"-Xlint:unsound-match",
]
if (versions.baseScala != '2.11') {
scalaCompileOptions.additionalParameters += [
"-Xlint:constant",
"-Xlint:unused"
]
}
configure(scalaCompileOptions.forkOptions) {
memoryMaximumSize = '1g'
jvmArgs = ['-Xss2m'] + maxPermSizeArgs

3
core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala

@ -24,7 +24,6 @@ import kafka.utils.CommandLineUtils @@ -24,7 +24,6 @@ import kafka.utils.CommandLineUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.CommonClientConfigs
import joptsimple._
import org.apache.kafka.common.Node
import scala.util.{Failure, Success}
@ -41,7 +40,7 @@ object BrokerApiVersionsCommand { @@ -41,7 +40,7 @@ object BrokerApiVersionsCommand {
val opts = new BrokerVersionCommandOptions(args)
val adminClient = createAdminClient(opts)
adminClient.awaitBrokers()
var brokerMap = adminClient.listAllBrokerVersionInfo()
val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.foreach { case (broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")

2
core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala

@ -28,8 +28,6 @@ import org.apache.kafka.common.utils.Utils @@ -28,8 +28,6 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.CommonClientConfigs
import joptsimple._
import scala.util.{Failure, Success}
/**
* A command for delete records of the given partitions down to the specified offset.
*/

2
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -366,7 +366,7 @@ object TopicCommand extends Logging { @@ -366,7 +366,7 @@ object TopicCommand extends Logging {
}
}
def askToProceed: Unit = {
def askToProceed(): Unit = {
println("Are you sure you want to continue? [y/n]")
if (!Console.readLine().equalsIgnoreCase("y")) {
println("Ending your session")

2
core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala

@ -17,8 +17,6 @@ @@ -17,8 +17,6 @@
package kafka.admin
import java.util.concurrent.LinkedBlockingQueue
import joptsimple.OptionParser
import org.I0Itec.zkclient.exception.ZkException
import kafka.utils.{CommandLineUtils, Logging, ZkUtils}

2
core/src/main/scala/kafka/api/ControlledShutdownResponse.scala

@ -44,7 +44,7 @@ case class ControlledShutdownResponse(correlationId: Int, @@ -44,7 +44,7 @@ case class ControlledShutdownResponse(correlationId: Int,
error: Errors = Errors.NONE,
partitionsRemaining: Set[TopicAndPartition])
extends RequestOrResponse() {
def sizeInBytes(): Int ={
def sizeInBytes: Int = {
var size =
4 /* correlation id */ +
2 /* error code */ +

1
core/src/main/scala/kafka/api/FetchRequest.scala

@ -22,7 +22,6 @@ import kafka.api.ApiUtils._ @@ -22,7 +22,6 @@ import kafka.api.ApiUtils._
import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
import kafka.network.RequestChannel
import kafka.message.MessageSet
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import java.util

2
core/src/main/scala/kafka/api/TopicMetadataRequest.scala

@ -47,7 +47,7 @@ case class TopicMetadataRequest(versionId: Short, @@ -47,7 +47,7 @@ case class TopicMetadataRequest(versionId: Short,
topics.foreach(topic => writeShortString(buffer, topic))
}
def sizeInBytes(): Int = {
def sizeInBytes: Int = {
2 + /* version id */
4 + /* correlation id */
shortStringLength(clientId) + /* client id */

2
core/src/main/scala/kafka/cluster/Replica.scala

@ -22,8 +22,6 @@ import kafka.utils.Logging @@ -22,8 +22,6 @@ import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import kafka.common.KafkaException
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
import org.apache.kafka.common.utils.Time
class Replica(val brokerId: Int,

1
core/src/main/scala/kafka/common/TopicAndPartition.scala

@ -1,7 +1,6 @@ @@ -1,7 +1,6 @@
package kafka.common
import kafka.cluster.{Partition, Replica}
import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
/**

2
core/src/main/scala/kafka/consumer/ConsumerConnector.scala

@ -79,7 +79,7 @@ trait ConsumerConnector { @@ -79,7 +79,7 @@ trait ConsumerConnector {
/**
* KAFKA-1743: This method added for backward compatibility.
*/
def commitOffsets
def commitOffsets()
/**
* Commit offsets from an external offsets map.

2
core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala

@ -45,7 +45,6 @@ class ConsumerFetcherManager(private val consumerIdString: String, @@ -45,7 +45,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(Time.SYSTEM.milliseconds),
config.clientId, config.numConsumerFetchers) {
private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] = null
private var cluster: Cluster = null
private val noLeaderPartitionSet = new mutable.HashSet[TopicPartition]
private val lock = new ReentrantLock
private val cond = lock.newCondition()
@ -126,7 +125,6 @@ class ConsumerFetcherManager(private val consumerIdString: String, @@ -126,7 +125,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
inLock(lock) {
partitionMap = topicInfos.map(tpi => (new TopicPartition(tpi.topic, tpi.partitionId), tpi)).toMap
this.cluster = cluster
noLeaderPartitionSet ++= topicInfos.map(tpi => new TopicPartition(tpi.topic, tpi.partitionId))
cond.signalAll()
}

2
core/src/main/scala/kafka/consumer/ConsumerIterator.scala

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging, CoreUtils}
import kafka.utils.{IteratorTemplate, Logging}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference

2
core/src/main/scala/kafka/consumer/KafkaStream.scala

@ -37,7 +37,7 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], @@ -37,7 +37,7 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
/**
* Create an iterator over messages in the stream.
*/
def iterator(): ConsumerIterator[K,V] = iter
def iterator: ConsumerIterator[K,V] = iter
/**
* This method clears the queue being iterated during the consumer rebalancing. This is mainly

1
core/src/main/scala/kafka/consumer/PartitionAssignor.scala

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package kafka.consumer
import org.I0Itec.zkclient.ZkClient
import kafka.common.TopicAndPartition
import kafka.utils.{Pool, CoreUtils, ZkUtils, Logging}

20
core/src/main/scala/kafka/controller/ControllerChannelManager.scala

@ -502,33 +502,17 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient, @@ -502,33 +502,17 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null)
class Callbacks private (var leaderAndIsrResponseCallback: AbstractResponse => Unit = null,
var updateMetadataResponseCallback: AbstractResponse => Unit = null,
var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = null)
class Callbacks private (var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit)
object Callbacks {
class CallbackBuilder {
var leaderAndIsrResponseCbk: AbstractResponse => Unit = null
var updateMetadataResponseCbk: AbstractResponse => Unit = null
var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null
def leaderAndIsrCallback(cbk: AbstractResponse => Unit): CallbackBuilder = {
leaderAndIsrResponseCbk = cbk
this
}
def updateMetadataCallback(cbk: AbstractResponse => Unit): CallbackBuilder = {
updateMetadataResponseCbk = cbk
this
}
def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): CallbackBuilder = {
stopReplicaResponseCbk = cbk
this
}
def build: Callbacks = {
new Callbacks(leaderAndIsrResponseCbk, updateMetadataResponseCbk, stopReplicaResponseCbk)
}
def build: Callbacks = new Callbacks(stopReplicaResponseCbk)
}
}

2
core/src/main/scala/kafka/controller/KafkaController.scala

@ -613,7 +613,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met @@ -613,7 +613,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
def incrementControllerEpoch() = {
try {
var newControllerEpoch = controllerContext.epoch + 1
val newControllerEpoch = controllerContext.epoch + 1
val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
if(!updateSucceeded)

10
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala

@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.WriteTxnMarkersResponse
import scala.collection.mutable
import collection.JavaConversions._
import scala.collection.JavaConverters._
class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnStateManager: TransactionStateManager,
@ -41,7 +41,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, @@ -41,7 +41,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
val correlation = requestHeader.correlationId
trace(s"Cancelled $api request $requestHeader with correlation id $correlation due to node ${response.destination} being disconnected")
for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
val transactionalId = txnIdAndMarker.txnId
val txnMarker = txnIdAndMarker.txnMarkerEntry
@ -82,7 +82,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, @@ -82,7 +82,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnMarker.producerEpoch,
txnMarker.transactionResult,
txnMarker.coordinatorEpoch,
txnMarker.partitions.toSet)
txnMarker.partitions.asScala.toSet)
}
}
}
@ -91,7 +91,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, @@ -91,7 +91,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
val transactionalId = txnIdAndMarker.txnId
val txnMarker = txnIdAndMarker.txnMarkerEntry
val errors = writeTxnMarkerResponse.errors(txnMarker.producerId)
@ -132,7 +132,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, @@ -132,7 +132,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
abortSending = true
} else {
txnMetadata synchronized {
for ((topicPartition: TopicPartition, error: Errors) <- errors) {
for ((topicPartition, error) <- errors.asScala) {
error match {
case Errors.NONE =>
txnMetadata.removePartition(topicPartition)

2
core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala

@ -39,7 +39,7 @@ class TopicMetadataRequest(val versionId: Short, @@ -39,7 +39,7 @@ class TopicMetadataRequest(val versionId: Short,
def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
def sizeInBytes: Int = underlying.sizeInBytes()
def sizeInBytes: Int = underlying.sizeInBytes
override def toString: String = {
describe(true)

2
core/src/main/scala/kafka/javaapi/producer/Producer.scala

@ -48,5 +48,5 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for @@ -48,5 +48,5 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
* Close API to close the producer pool connections to all Kafka brokers. Also closes
* the zookeeper client connection if one exists
*/
def close = underlying.close
def close() = underlying.close()
}

6
core/src/main/scala/kafka/log/AbstractIndex.scala

@ -37,8 +37,8 @@ import scala.math.ceil @@ -37,8 +37,8 @@ import scala.math.ceil
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean)
extends Logging {
abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
val maxIndexSize: Int = -1, val writable: Boolean) extends Logging {
protected def entrySize: Int
@ -109,7 +109,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon @@ -109,7 +109,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
/* Windows won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS)
forceUnmap(mmap);
forceUnmap(mmap)
try {
raf.setLength(roundedNewSize)
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)

2
core/src/main/scala/kafka/log/LogCleanerManager.scala

@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantLock @@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.common.LogCleaningAbortedException
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.TopicPartition

5
core/src/main/scala/kafka/log/OffsetIndex.scala

@ -48,8 +48,9 @@ import kafka.common.InvalidOffsetException @@ -48,8 +48,9 @@ import kafka.common.InvalidOffsetException
* All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
* storage format.
*/
class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize, writable) {
// Avoid shadowing mutable `file` in AbstractIndex
class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
override def entrySize = 8

9
core/src/main/scala/kafka/log/TimeIndex.scala

@ -49,11 +49,9 @@ import org.apache.kafka.common.record.RecordBatch @@ -49,11 +49,9 @@ import org.apache.kafka.common.record.RecordBatch
* No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
*
*/
class TimeIndex(file: File,
baseOffset: Long,
maxIndexSize: Int = -1,
writable: Boolean = true)
extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize, writable) with Logging {
// Avoid shadowing mutable file in AbstractIndex
class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractIndex[Long, Long](_file, baseOffset, maxIndexSize, writable) with Logging {
override def entrySize = 12
@ -206,5 +204,4 @@ class TimeIndex(file: File, @@ -206,5 +204,4 @@ class TimeIndex(file: File,
"Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
" bytes which is not positive or not a multiple of 12.")
}
}

4
core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala

@ -40,7 +40,7 @@ trait KafkaMetricsGroup extends Logging { @@ -40,7 +40,7 @@ trait KafkaMetricsGroup extends Logging {
* @param tags Additional attributes which mBean will have.
* @return Sanitized metric name object.
*/
private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
private def metricName(name: String, tags: scala.collection.Map[String, String]) = {
val klass = this.getClass
val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
@ -52,7 +52,7 @@ trait KafkaMetricsGroup extends Logging { @@ -52,7 +52,7 @@ trait KafkaMetricsGroup extends Logging {
}
private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String]) = {
val nameBuilder: StringBuilder = new StringBuilder
nameBuilder.append(group)

2
core/src/main/scala/kafka/producer/Producer.scala

@ -132,7 +132,7 @@ class Producer[K,V](val config: ProducerConfig, @@ -132,7 +132,7 @@ class Producer[K,V](val config: ProducerConfig,
KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
if (producerSendThread != null)
producerSendThread.shutdown
eventHandler.close
eventHandler.close()
info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
}
}

2
core/src/main/scala/kafka/producer/async/EventHandler.scala

@ -33,5 +33,5 @@ trait EventHandler[K,V] { @@ -33,5 +33,5 @@ trait EventHandler[K,V] {
/**
* Cleans up and shuts down the event handler
*/
def close
def close(): Unit
}

2
core/src/main/scala/kafka/producer/async/ProducerSendThread.scala

@ -53,7 +53,7 @@ class ProducerSendThread[K,V](val threadName: String, @@ -53,7 +53,7 @@ class ProducerSendThread[K,V](val threadName: String,
}
}
def shutdown = {
def shutdown(): Unit = {
info("Begin shutting down ProducerSendThread")
queue.put(shutdownCommand)
shutdownLatch.await

21
core/src/main/scala/kafka/security/auth/Acl.scala

@ -56,21 +56,20 @@ object Acl { @@ -56,21 +56,20 @@ object Acl {
if (aclJson == null || aclJson.isEmpty)
return collection.immutable.Set.empty[Acl]
var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
Json.parseFull(aclJson).foreach { m =>
Json.parseFull(aclJson).toSet[Any].flatMap { m =>
val aclMap = m.asInstanceOf[Map[String, Any]]
//the acl json version.
require(aclMap(VersionKey) == CurrentVersion)
val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
aclSet.foreach(item => {
val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
val host: String = item(HostsKey).asInstanceOf[String]
acls += new Acl(principal, permissionType, host, operation)
})
val aclSet = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
aclSet.map { item =>
val principal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
val permissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
val operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
val host = item(HostsKey).asInstanceOf[String]
new Acl(principal, permissionType, host, operation)
}
}
acls.toSet
}
def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {

2
core/src/main/scala/kafka/server/ClientQuotaManager.scala

@ -215,7 +215,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -215,7 +215,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// Compute the delay
val clientQuotaEntity = clientSensors.quotaEntity
val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).round.toInt
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt
clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))

4
core/src/main/scala/kafka/utils/IteratorTemplate.scala

@ -42,12 +42,12 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T @@ -42,12 +42,12 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T
}
def peek(): T = {
if(!hasNext())
if(!hasNext)
throw new NoSuchElementException()
nextItem
}
def hasNext(): Boolean = {
def hasNext: Boolean = {
if(state == FAILED)
throw new IllegalStateException("Iterator is in failed state")
state match {

4
core/src/main/scala/kafka/utils/KafkaScheduler.scala

@ -127,8 +127,8 @@ class KafkaScheduler(val threads: Int, @@ -127,8 +127,8 @@ class KafkaScheduler(val threads: Int,
}
}
private def ensureRunning = {
if(!isStarted)
private def ensureRunning(): Unit = {
if (!isStarted)
throw new IllegalStateException("Kafka scheduler is not running.")
}
}

2
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala

@ -30,7 +30,7 @@ import org.junit.Test @@ -30,7 +30,7 @@ import org.junit.Test
class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
@Test(timeout=120000)
def checkBrokerApiVersionCommandOutput() {

2
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -364,7 +364,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { @@ -364,7 +364,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
client.close()
}
override def generateConfigs() = {
override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach { config =>

2
core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala

@ -59,7 +59,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { @@ -59,7 +59,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
override def generateConfigs() = {
override def generateConfigs = {
FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, serverConfig))
}

28
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala

@ -187,14 +187,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -187,14 +187,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* Tests the ability of producing and consuming with the appropriate ACLs set.
*/
@Test
def testProduceConsumeViaAssign {
def testProduceConsumeViaAssign(): Unit = {
setAclsAndProduce()
consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head, numRecords)
}
@Test
def testProduceConsumeViaSubscribe {
def testProduceConsumeViaSubscribe(): Unit = {
setAclsAndProduce()
consumers.head.subscribe(List(topic).asJava)
consumeRecords(this.consumers.head, numRecords)
@ -215,12 +215,12 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -215,12 +215,12 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* isn't set.
*/
@Test(expected = classOf[TimeoutException])
def testNoProduceWithoutDescribeAcl {
def testNoProduceWithoutDescribeAcl(): Unit = {
sendRecords(numRecords, tp)
}
@Test
def testNoProduceWithDescribeAcl {
def testNoProduceWithDescribeAcl(): Unit = {
AclCommand.main(describeAclArgs)
servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
@ -239,7 +239,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -239,7 +239,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* ACL set.
*/
@Test(expected = classOf[KafkaException])
def testNoConsumeWithoutDescribeAclViaAssign {
def testNoConsumeWithoutDescribeAclViaAssign(): Unit = {
noConsumeWithoutDescribeAclSetup
consumers.head.assign(List(tp).asJava)
// the exception is expected when the consumer attempts to lookup offsets
@ -247,14 +247,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -247,14 +247,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
@Test(expected = classOf[TimeoutException])
def testNoConsumeWithoutDescribeAclViaSubscribe {
def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
noConsumeWithoutDescribeAclSetup
consumers.head.subscribe(List(topic).asJava)
// this should timeout since the consumer will not be able to fetch any metadata for the topic
consumeRecords(this.consumers.head, timeout = 3000)
}
private def noConsumeWithoutDescribeAclSetup {
private def noConsumeWithoutDescribeAclSetup(): Unit = {
AclCommand.main(produceAclArgs)
AclCommand.main(groupAclArgs)
servers.foreach { s =>
@ -270,13 +270,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -270,13 +270,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
}
}
/**
* Tests that a consumer fails to consume messages without the appropriate
* ACL set.
*/
@Test
def testNoConsumeWithDescribeAclViaAssign {
def testNoConsumeWithDescribeAclViaAssign(): Unit = {
noConsumeWithDescribeAclSetup
consumers.head.assign(List(tp).asJava)
@ -290,7 +286,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -290,7 +286,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
@Test
def testNoConsumeWithDescribeAclViaSubscribe {
def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
noConsumeWithDescribeAclSetup
consumers.head.subscribe(List(topic).asJava)
@ -303,7 +299,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -303,7 +299,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
}
private def noConsumeWithDescribeAclSetup {
private def noConsumeWithDescribeAclSetup(): Unit = {
AclCommand.main(produceAclArgs)
AclCommand.main(groupAclArgs)
servers.foreach { s =>
@ -318,7 +314,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @@ -318,7 +314,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* ACL set.
*/
@Test
def testNoGroupAcl {
def testNoGroupAcl(): Unit = {
AclCommand.main(produceAclArgs)
servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)

2
core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala

@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness { @@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
val topicAndPartition = new TopicAndPartition(topic, part)
this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.api.EndToEndClusterIdTest$MockBrokerMetricsReporter")
override def generateConfigs() = {
override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach(_.putAll(serverConfig))

2
core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala

@ -51,7 +51,7 @@ class ProducerBounceTest extends KafkaServerTestHarness { @@ -51,7 +51,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
//
// Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
// a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
override def generateConfigs() = {
override def generateConfigs = {
FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
.map(KafkaConfig.fromProps(_, overridingProps))
}

2
core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala

@ -49,7 +49,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -49,7 +49,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
def generateConfigs() =
def generateConfigs =
TestUtils.createBrokerConfigs(numServers, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps))
private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = null

2
core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala

@ -35,7 +35,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa @@ -35,7 +35,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString)
def generateConfigs() =
def generateConfigs =
(0 until numServers) map { node =>
TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString))
} map (KafkaConfig.fromProps(_, overridingProps))

2
core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala

@ -53,7 +53,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { @@ -53,7 +53,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
* the second one connects ok, but fails to consume messages due to the ACL.
*/
@Test(timeout = 15000)
def testTwoConsumersWithDifferentSaslCredentials {
def testTwoConsumersWithDifferentSaslCredentials(): Unit = {
setAclsAndProduce()
val consumer1 = consumers.head

13
core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala

@ -22,13 +22,12 @@ import java.util.Properties @@ -22,13 +22,12 @@ import java.util.Properties
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{Ignore, Test}
import org.junit.Test
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import org.junit.Assert._
@ -67,7 +66,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { @@ -67,7 +66,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
//
// Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
// a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
override def generateConfigs() = {
override def generateConfigs = {
FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
.map(KafkaConfig.fromProps(_, overridingProps))
}
@ -105,7 +104,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { @@ -105,7 +104,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
!shouldAbort), new ErrorLoggingCallback(outputTopic, record.key, record.value, true))
}
trace(s"Sent ${records.size} messages. Committing offsets.")
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup)
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumerGroup)
if (shouldAbort) {
trace(s"Committed offsets. Aborting transaction of ${records.size} messages.")
@ -150,7 +149,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { @@ -150,7 +149,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = groupId,
securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
consumer.subscribe(topics)
consumer.subscribe(topics.asJava)
consumer
}

4
core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala

@ -30,8 +30,8 @@ import org.junit.Test @@ -30,8 +30,8 @@ import org.junit.Test
class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect)
.map(KafkaConfig.fromProps(_, new Properties()))
override def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties()))
@Test
def testCommaSeparatedRegex(): Unit = {

12
core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala

@ -64,7 +64,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @@ -64,7 +64,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
def testTopicDoesNotExist {
def testTopicDoesNotExist(): Unit = {
try {
AdminUtils.addPartitions(zkUtils, "Blah", 1)
fail("Topic should not exist")
@ -74,7 +74,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @@ -74,7 +74,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
def testWrongReplicaCount {
def testWrongReplicaCount(): Unit = {
try {
AdminUtils.addPartitions(zkUtils, topic1, 2, "0:1,0:1:2")
fail("Add partitions should fail")
@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
def testIncrementPartitions {
def testIncrementPartitions(): Unit = {
AdminUtils.addPartitions(zkUtils, topic1, 3)
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
@ -111,7 +111,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @@ -111,7 +111,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
def testManualAssignmentOfReplicas {
def testManualAssignmentOfReplicas(): Unit = {
AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3")
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
@ -139,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @@ -139,7 +139,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
def testReplicaPlacementAllServers {
def testReplicaPlacementAllServers(): Unit = {
AdminUtils.addPartitions(zkUtils, topic3, 7)
// read metadata from a broker and verify the new topic partitions exist
@ -166,7 +166,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { @@ -166,7 +166,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
@Test
def testReplicaPlacementPartialServers {
def testReplicaPlacementPartialServers(): Unit = {
AdminUtils.addPartitions(zkUtils, topic2, 3)
// read metadata from a broker and verify the new topic partitions exist

2
core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala

@ -28,7 +28,7 @@ import kafka.integration.KafkaServerTestHarness @@ -28,7 +28,7 @@ import kafka.integration.KafkaServerTestHarness
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class DeleteConsumerGroupTest extends KafkaServerTestHarness {
def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
def generateConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
@Test
def testGroupWideDeleteInZK() {

3
core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala

@ -47,7 +47,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { @@ -47,7 +47,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
private var consumerGroupExecutor: ConsumerGroupExecutor = _
// configure the servers and clients
override def generateConfigs() = {
override def generateConfigs = {
TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
KafkaConfig.fromProps(props)
}
@ -274,6 +274,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { @@ -274,6 +274,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
}
}
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0")
private def createOldConsumer(): Unit = {
val consumerProps = new Properties
consumerProps.setProperty("group.id", group)

3
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala

@ -40,7 +40,8 @@ class ListConsumerGroupTest extends KafkaServerTestHarness { @@ -40,7 +40,8 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
val props = new Properties
// configure the servers and clients
override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
override def generateConfigs =
TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
@Before
override def setUp() {

4
core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala

@ -30,7 +30,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { @@ -30,7 +30,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
var servers: Seq[KafkaServer] = null
val topicName = "my-topic"
val delayMs = 1000
def zkUpdateDelay = {Thread.sleep(delayMs)}
def zkUpdateDelay(): Unit = Thread.sleep(delayMs)
@Before
override def setUp() {
@ -49,7 +49,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { @@ -49,7 +49,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}
@Test
def shouldMoveSinglePartition {
def shouldMoveSinglePartition(): Unit = {
//Given a single replica on server 100
startBrokers(Seq(100, 101))
val partition = 0

2
core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala

@ -133,7 +133,7 @@ class ReassignPartitionsCommandTest extends Logging { @@ -133,7 +133,7 @@ class ReassignPartitionsCommandTest extends Logging {
case "topic2" =>
assertEquals("0:101,0:102", configChange.get(LeaderReplicationThrottledReplicasProp))
assertEquals("0:100", configChange.get(FollowerReplicationThrottledReplicasProp))
case _ => fail("Unexpected topic $topic")
case _ => fail(s"Unexpected topic $topic")
}
calls += 1
}

2
core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala

@ -67,7 +67,7 @@ class BrokerEndPointTest extends Logging { @@ -67,7 +67,7 @@ class BrokerEndPointTest extends Logging {
}
@Test
def testFromJsonV2 {
def testFromJsonV2(): Unit = {
val brokerInfoStr = """{
"version":2,
"host":"localhost",

4
core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala

@ -18,12 +18,12 @@ package kafka.common @@ -18,12 +18,12 @@ package kafka.common
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestUtils, ZkUtils}
import kafka.utils.TestUtils
import org.junit.Test
class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
override def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
override def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testProcessNotification() {

2
core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala

@ -38,7 +38,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness { @@ -38,7 +38,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
val numNodes = 1
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
val messages = new mutable.HashMap[Int, Seq[Message]]
val topic = "topic"

4
core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

@ -45,8 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging @@ -45,8 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
.map(KafkaConfig.fromProps(_, overridingProps))
override def generateConfigs =
TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
val group = "group1"
val consumer0 = "consumer0"

2
core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala

@ -40,7 +40,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { @@ -40,7 +40,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
val metrics = new Metrics()
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect)
override def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect)
.map(KafkaConfig.fromProps(_, overridingProps))
@After

2
core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala

@ -31,7 +31,7 @@ import org.junit.Assert._ @@ -31,7 +31,7 @@ import org.junit.Assert._
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
val topic = "test_topic"
val group = "default_group"

2
core/src/test/scala/unit/kafka/integration/FetcherTest.scala

@ -32,7 +32,7 @@ import kafka.utils.TestUtils @@ -32,7 +32,7 @@ import kafka.utils.TestUtils
@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
class FetcherTest extends KafkaServerTestHarness {
val numNodes = 1
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
def generateConfigs = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
val topic = "topic"

2
core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala

@ -48,7 +48,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with @@ -48,7 +48,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
@volatile private var running = true
override def generateConfigs() = TestUtils.createBrokerConfigs(nodesNum, zkConnect)
override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, zkConnect)
.map(KafkaConfig.fromProps(_, overridingProps))
@Before

2
core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala

@ -27,7 +27,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness { @@ -27,7 +27,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
@Test
def testDefaultKafkaConfig() {

2
core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala

@ -42,7 +42,7 @@ import org.apache.kafka.common.TopicPartition @@ -42,7 +42,7 @@ import org.apache.kafka.common.TopicPartition
class PrimitiveApiTest extends ProducerConsumerTestHarness {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testFetchRequestCanProperlySerialize() {

22
core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala

@ -17,14 +17,12 @@ @@ -17,14 +17,12 @@
package kafka.integration
import java.io.File
import kafka.admin.AdminUtils
import kafka.api.TopicMetadataResponse
import kafka.client.ClientUtils
import kafka.cluster.BrokerEndPoint
import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.Errors
@ -59,7 +57,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @@ -59,7 +57,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testBasicTopicMetadata {
def testBasicTopicMetadata(): Unit = {
// create topic
val topic = "test"
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
@ -77,7 +75,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @@ -77,7 +75,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testGetAllTopicMetadata {
def testGetAllTopicMetadata(): Unit = {
// create topic
val topic1 = "testGetAllTopicMetadata1"
val topic2 = "testGetAllTopicMetadata2"
@ -102,7 +100,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @@ -102,7 +100,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testAutoCreateTopic {
def testAutoCreateTopic(): Unit = {
// auto create topic
val topic = "testAutoCreateTopic"
var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
@ -129,7 +127,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @@ -129,7 +127,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testAutoCreateTopicWithInvalidReplication {
def testAutoCreateTopicWithInvalidReplication(): Unit = {
val adHocProps = createBrokerConfig(2, zkConnect)
// Set default replication higher than the number of live brokers
adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
@ -152,7 +150,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @@ -152,7 +150,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testAutoCreateTopicWithCollision {
def testAutoCreateTopicWithCollision(): Unit = {
// auto create topic
val topic1 = "testAutoCreate_Topic"
val topic2 = "testAutoCreate.Topic"
@ -212,7 +210,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @@ -212,7 +210,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testIsrAfterBrokerShutDownAndJoinsBack {
def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
val numBrokers = 2 //just 2 brokers are enough for the test
// start adHoc brokers
@ -260,12 +258,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @@ -260,12 +258,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testAliveBrokerListWithNoTopics {
def testAliveBrokerListWithNoTopics(): Unit = {
checkMetadata(Seq(server1), 1)
}
@Test
def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup(): Unit = {
adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
checkMetadata(adHocServers, numConfigs - 1)
@ -278,7 +276,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { @@ -278,7 +276,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
@Test
def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
def testAliveBrokersListWithNoTopicsAfterABrokerShutdown(): Unit = {
adHocServers = adHocConfigs.map(p => createServer(p))
checkMetadata(adHocServers, numConfigs)

14
core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala

@ -102,7 +102,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -102,7 +102,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
def testUncleanLeaderElectionEnabled {
def testUncleanLeaderElectionEnabled(): Unit = {
// enable unclean leader election
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
@ -116,7 +116,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -116,7 +116,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@Test
@Ignore // Should be re-enabled after KAFKA-3096 is fixed
def testUncleanLeaderElectionDisabled {
def testUncleanLeaderElectionDisabled(): Unit = {
// unclean leader election is disabled by default
startBrokers(Seq(configProps1, configProps2))
@ -127,7 +127,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -127,7 +127,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
def testUncleanLeaderElectionEnabledByTopicOverride {
def testUncleanLeaderElectionEnabledByTopicOverride(): Unit = {
// disable unclean leader election globally, but enable for our specific test topic
configProps1.put("unclean.leader.election.enable", "false")
configProps2.put("unclean.leader.election.enable", "false")
@ -144,7 +144,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -144,7 +144,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@Test
@Ignore // Should be re-enabled after KAFKA-3096 is fixed
def testCleanLeaderElectionDisabledByTopicOverride {
def testCleanLeaderElectionDisabledByTopicOverride(): Unit = {
// enable unclean leader election globally, but disable for our specific test topic
configProps1.put("unclean.leader.election.enable", "true")
configProps2.put("unclean.leader.election.enable", "true")
@ -160,7 +160,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -160,7 +160,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
def testUncleanLeaderElectionInvalidTopicOverride {
def testUncleanLeaderElectionInvalidTopicOverride(): Unit = {
startBrokers(Seq(configProps1))
// create topic with an invalid value for unclean leader election
@ -172,7 +172,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -172,7 +172,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
}
def verifyUncleanLeaderElectionEnabled {
def verifyUncleanLeaderElectionEnabled(): Unit = {
// wait until leader is elected
val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
debug("Leader for " + topic + " is elected to be: %s".format(leaderId))
@ -205,7 +205,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -205,7 +205,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
assertEquals(List("first", "third"), consumeAllMessages(topic))
}
def verifyUncleanLeaderElectionDisabled {
def verifyUncleanLeaderElectionDisabled(): Unit = {
// wait until leader is elected
val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
debug("Leader for " + topic + " is elected to be: %s".format(leaderId))

3
core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala

@ -45,7 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging @@ -45,7 +45,8 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
def generateConfigs =
TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
val group = "group1"
val consumer1 = "consumer1"

2
core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala

@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { @@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
@Test
def testWrittenEqualsRead {
def testWrittenEqualsRead(): Unit = {
val messageSet = createMessageSet(messages)
assertEquals(messages.toSeq, messageSet.asScala.map(m => m.message))
}

2
core/src/test/scala/unit/kafka/log/TimeIndexTest.scala

@ -35,7 +35,7 @@ class TimeIndexTest extends JUnitSuite { @@ -35,7 +35,7 @@ class TimeIndexTest extends JUnitSuite {
@Before
def setup() {
this.idx = new TimeIndex(file = nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12)
this.idx = new TimeIndex(nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12)
}
@After

4
core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala

@ -30,13 +30,13 @@ class TransactionIndexTest extends JUnitSuite { @@ -30,13 +30,13 @@ class TransactionIndexTest extends JUnitSuite {
val offset = 0L
@Before
def setup: Unit = {
def setup(): Unit = {
file = TestUtils.tempFile()
index = new TransactionIndex(offset, file)
}
@After
def teardown: Unit = {
def teardown(): Unit = {
index.close()
}

2
core/src/test/scala/unit/kafka/message/MessageTest.scala

@ -57,7 +57,7 @@ class MessageTest extends JUnitSuite { @@ -57,7 +57,7 @@ class MessageTest extends JUnitSuite {
}
@Test
def testFieldValues {
def testFieldValues(): Unit = {
for(v <- messages) {
// check payload
if(v.payload == null) {

2
core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

@ -44,7 +44,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @@ -44,7 +44,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val overridingProps = new Properties
overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
def generateConfigs() =
def generateConfigs =
TestUtils.createBrokerConfigs(numNodes, zkConnect, enableDeleteTopic=true).map(KafkaConfig.fromProps(_, overridingProps))
val nMessages = 2

2
core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala

@ -62,7 +62,7 @@ class AsyncProducerTest { @@ -62,7 +62,7 @@ class AsyncProducerTest {
Thread.sleep(500)
}
def close {}
def close(): Unit = ()
}
val props = new Properties()

2
core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala

@ -37,7 +37,7 @@ import org.junit.Assert._ @@ -37,7 +37,7 @@ import org.junit.Assert._
class SyncProducerTest extends KafkaServerTestHarness {
private val messageBytes = new Array[Byte](2)
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
private def produceRequest(topic: String,
partition: Int,

2
core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala

@ -166,7 +166,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { @@ -166,7 +166,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
* Tests the migration tool when chroot is being used.
*/
@Test
def testChroot {
def testChroot(): Unit = {
val zkUrl = zkConnect + "/kafka"
zkUtils.createPersistentPath("/kafka")
val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false)

6
core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala

@ -37,7 +37,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness { @@ -37,7 +37,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
}
@Test
def testBrokerAdvertiseHostNameAndPortToZK: Unit = {
def testBrokerAdvertiseHostNameAndPortToZK(): Unit = {
val advertisedHostName = "routable-host1"
val advertisedPort = 1234
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
@ -54,7 +54,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness { @@ -54,7 +54,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName.value)
}
def testBrokerAdvertiseListenersToZK: Unit = {
def testBrokerAdvertiseListenersToZK(): Unit = {
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
props.put("advertised.listeners", "PLAINTEXT://routable-listener:3334")
servers += TestUtils.createServer(KafkaConfig.fromProps(props))
@ -68,7 +68,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness { @@ -68,7 +68,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName)
}
def testBrokerAdvertiseListenersWithCustomNamesToZK: Unit = {
def testBrokerAdvertiseListenersWithCustomNamesToZK(): Unit = {
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
props.put("listeners", "INTERNAL://:0,EXTERNAL://:0")
props.put("advertised.listeners", "EXTERNAL://external-listener:9999,INTERNAL://internal-listener:10999")

2
core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala

@ -25,7 +25,7 @@ import org.junit.Test @@ -25,7 +25,7 @@ import org.junit.Test
class ApiVersionsTest {
@Test
def testApiVersions {
def testApiVersions(): Unit = {
val apiVersions = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions
assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length)

2
core/src/test/scala/unit/kafka/server/BaseRequestTest.scala

@ -39,7 +39,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { @@ -39,7 +39,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
// If required, override properties by mutating the passed Properties object
protected def propertyOverrides(properties: Properties) {}
def generateConfigs() = {
def generateConfigs = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
enableControlledShutdown = false, enableDeleteTopic = true,
interBrokerSecurityProtocol = Some(securityProtocol),

11
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala

@ -26,14 +26,13 @@ import org.easymock.EasyMock @@ -26,14 +26,13 @@ import org.easymock.EasyMock
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
import kafka.common._
import kafka.admin.{AdminOperationException, AdminUtils}
import org.apache.kafka.common.TopicPartition
import scala.collection.Map
class DynamicConfigChangeTest extends KafkaServerTestHarness {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testConfigChange() {
@ -174,7 +173,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @@ -174,7 +173,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
def testProcessNotification {
def testProcessNotification(): Unit = {
val props = new Properties()
props.put("a.b", "10")
@ -230,7 +229,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @@ -230,7 +229,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
def shouldParseReplicationQuotaProperties {
def shouldParseReplicationQuotaProperties(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
val props: Properties = new Properties()
@ -243,7 +242,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @@ -243,7 +242,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
def shouldParseWildcardReplicationQuotaProperties {
def shouldParseWildcardReplicationQuotaProperties(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
val props: Properties = new Properties()
@ -258,7 +257,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @@ -258,7 +257,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
def shouldParseReplicationQuotaReset {
def shouldParseReplicationQuotaReset(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
val props: Properties = new Properties()

2
core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala

@ -37,7 +37,7 @@ import scala.collection.JavaConverters._ @@ -37,7 +37,7 @@ import scala.collection.JavaConverters._
class EdgeCaseRequestTest extends KafkaServerTestHarness {
def generateConfigs() = {
def generateConfigs = {
val props = TestUtils.createBrokerConfig(1, zkConnect)
props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
List(KafkaConfig.fromProps(props))

2
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -121,7 +121,7 @@ class KafkaConfigTest { @@ -121,7 +121,7 @@ class KafkaConfigTest {
}
@Test
def testLogRetentionValid {
def testLogRetentionValid(): Unit = {
val props1 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)

2
core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala

@ -65,7 +65,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { @@ -65,7 +65,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
}
@Test
def testLeaderElectionAndEpoch {
def testLeaderElectionAndEpoch(): Unit = {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0

8
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala

@ -99,7 +99,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { @@ -99,7 +99,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@Test
def testHWCheckpointNoFailuresSingleLogSegment {
def testHWCheckpointNoFailuresSingleLogSegment(): Unit = {
val numMessages = 2L
sendMessages(numMessages.toInt)
@ -116,7 +116,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { @@ -116,7 +116,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@Test
def testHWCheckpointWithFailuresSingleLogSegment {
def testHWCheckpointWithFailuresSingleLogSegment(): Unit = {
var leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
assertEquals(0L, hwFile1.read.getOrElse(topicPartition, 0L))
@ -167,7 +167,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { @@ -167,7 +167,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@Test
def testHWCheckpointNoFailuresMultipleLogSegments {
def testHWCheckpointNoFailuresMultipleLogSegments(): Unit = {
sendMessages(20)
val hw = 20L
// give some time for follower 1 to record leader HW of 600
@ -183,7 +183,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { @@ -183,7 +183,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
}
@Test
def testHWCheckpointWithFailuresMultipleLogSegments {
def testHWCheckpointWithFailuresMultipleLogSegments(): Unit = {
var leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
sendMessages(2)

10
core/src/test/scala/unit/kafka/server/ServerStartupTest.scala

@ -35,7 +35,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { @@ -35,7 +35,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
def testBrokerCreatesZKChroot {
def testBrokerCreatesZKChroot(): Unit = {
val brokerId = 0
val zookeeperChroot = "/kafka-chroot-for-unittest"
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
@ -48,7 +48,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { @@ -48,7 +48,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
def testConflictBrokerStartupWithSamePort {
def testConflictBrokerStartupWithSamePort(): Unit = {
// Create and start first broker
val brokerId1 = 0
val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect)
@ -67,7 +67,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { @@ -67,7 +67,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
def testConflictBrokerRegistration {
def testConflictBrokerRegistration(): Unit = {
// Try starting a broker with the a conflicting broker id.
// This shouldn't affect the existing broker registration.
@ -90,7 +90,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { @@ -90,7 +90,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
def testBrokerSelfAware {
def testBrokerSelfAware(): Unit = {
val brokerId = 0
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
server = TestUtils.createServer(KafkaConfig.fromProps(props))
@ -101,7 +101,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { @@ -101,7 +101,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
}
@Test
def testBrokerStateRunningAfterZK {
def testBrokerStateRunningAfterZK(): Unit = {
val brokerId = 0
val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])

2
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala

@ -263,7 +263,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @@ -263,7 +263,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
/**
* Simulates how the Replica Fetcher Thread requests leader offsets for epochs
*/
private class TestFetcherThread(sender: BlockingSend) extends Logging {
private[epoch] class TestFetcherThread(sender: BlockingSend) extends Logging {
def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
val request = new OffsetsForLeaderEpochRequest.Builder(toJavaFormat(partitions))

2
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -448,7 +448,7 @@ object TestUtils extends Logging { @@ -448,7 +448,7 @@ object TestUtils extends Logging {
var cur: Iterator[T] = null
val topIterator = s.iterator
def hasNext() : Boolean = {
def hasNext: Boolean = {
while (true) {
if (cur == null) {
if (topIterator.hasNext)

8
core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala

@ -72,7 +72,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { @@ -72,7 +72,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
}
@Test
def testEphemeralNodeCleanup = {
def testEphemeralNodeCleanup(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled())
@ -100,7 +100,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { @@ -100,7 +100,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
* Tests basic creation
*/
@Test
def testZkWatchedEphemeral = {
def testZkWatchedEphemeral(): Unit = {
testCreation("/zwe-test")
testCreation("/zwe-test-parent/zwe-test")
}
@ -128,7 +128,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { @@ -128,7 +128,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
* session.
*/
@Test
def testOverlappingSessions = {
def testOverlappingSessions(): Unit = {
val path = "/zwe-test"
val zk1 = zkUtils.zkConnection.getZookeeper
@ -156,7 +156,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness { @@ -156,7 +156,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
* Tests if succeeds with znode from the same session
*/
@Test
def testSameSession = {
def testSameSession(): Unit = {
val path = "/zwe-test"
val zk = zkUtils.zkConnection.getZookeeper
// Creates znode for path in the first session

16
core/src/test/scala/unit/kafka/zk/ZKPathTest.scala

@ -30,7 +30,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @@ -30,7 +30,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
@Test
def testCreatePersistentPathThrowsException {
def testCreatePersistentPathThrowsException(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
@ -46,7 +46,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @@ -46,7 +46,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
def testCreatePersistentPath {
def testCreatePersistentPath(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.zkPath.resetNamespaceCheckedState
@ -56,7 +56,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @@ -56,7 +56,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
def testMakeSurePersistsPathExistsThrowsException {
def testMakeSurePersistsPathExistsThrowsException(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
@ -70,7 +70,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @@ -70,7 +70,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
def testMakeSurePersistsPathExists {
def testMakeSurePersistsPathExists(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.zkPath.resetNamespaceCheckedState
@ -80,7 +80,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @@ -80,7 +80,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
def testCreateEphemeralPathThrowsException {
def testCreateEphemeralPathThrowsException(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
try {
@ -94,7 +94,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @@ -94,7 +94,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
def testCreateEphemeralPathExists {
def testCreateEphemeralPathExists(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.zkPath.resetNamespaceCheckedState
@ -104,7 +104,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @@ -104,7 +104,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
def testCreatePersistentSequentialThrowsException {
def testCreatePersistentSequentialThrowsException(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
@ -119,7 +119,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @@ -119,7 +119,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
@Test
def testCreatePersistentSequentialExists {
def testCreatePersistentSequentialExists(): Unit = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
zkUtils.zkPath.resetNamespaceCheckedState

Loading…
Cancel
Save