Browse Source

KAFKA-5013; Fail the build when findbugs fails

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2805 from cmccabe/KAFKA-5013
pull/2843/head
Colin P. Mccabe 8 years ago committed by Ismael Juma
parent
commit
256f8d5662
  1. 9
      build.gradle
  2. 31
      core/src/main/scala/kafka/coordinator/PidMetadata.scala
  3. 13
      core/src/main/scala/kafka/log/ProducerIdMapping.scala
  4. 10
      core/src/main/scala/kafka/server/DelayedOperation.scala
  5. 2
      core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
  6. 3
      core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
  7. 8
      core/src/main/scala/kafka/utils/ZkUtils.scala
  8. 71
      gradle/findbugs-exclude.xml
  9. 2
      jenkins.sh

9
build.gradle

@ -321,8 +321,9 @@ subprojects { @@ -321,8 +321,9 @@ subprojects {
findbugs {
toolVersion = "3.0.1"
excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
ignoreFailures = true
ignoreFailures = false
}
test.dependsOn('findbugsMain')
tasks.withType(FindBugs) {
reports {
@ -352,6 +353,12 @@ subprojects { @@ -352,6 +353,12 @@ subprojects {
}
gradle.taskGraph.whenReady { taskGraph ->
taskGraph.getAllTasks().findAll { it.name.contains('findbugsScoverage') || it.name.contains('findbugsTest') }.each { task ->
task.enabled = false
}
}
def fineTuneEclipseClasspathFile(eclipse, project) {
eclipse.classpath.file {
beforeMerged { cp ->

31
core/src/main/scala/kafka/coordinator/PidMetadata.scala

@ -1,31 +0,0 @@ @@ -1,31 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator
import kafka.utils.nonthreadsafe
@nonthreadsafe
private[coordinator] class PidMetadata(val pid: Long) {
/* current epoch number of the PID */
var epoch: Short = 0
override def equals(that: Any): Boolean = that match {
case other: PidMetadata => pid == other.pid && epoch == other.epoch
case _ => false
}
}

13
core/src/main/scala/kafka/log/ProducerIdMapping.scala

@ -221,7 +221,7 @@ class ProducerIdMapping(val config: LogConfig, @@ -221,7 +221,7 @@ class ProducerIdMapping(val config: LogConfig,
import ProducerIdMapping._
val snapDir: File = new File(snapParentDir, DirnamePrefix)
snapDir.mkdir()
Files.createDirectories(snapDir.toPath)
private val pidMap = mutable.Map[Long, ProducerIdEntry]()
private var lastMapOffset = 0L
@ -255,13 +255,16 @@ class ProducerIdMapping(val config: LogConfig, @@ -255,13 +255,16 @@ class ProducerIdMapping(val config: LogConfig,
loaded = true
} catch {
case e: CorruptSnapshotException =>
error(s"Snapshot file at ${file} is corrupt: ${e.getMessage}")
file.delete()
error(s"Snapshot file at $file is corrupt: ${e.getMessage}")
try Files.delete(file.toPath)
catch {
case e: IOException => error(s"Failed to delete corrupt snapshot file $file", e)
}
}
case None =>
lastSnapOffset = 0L
lastMapOffset = 0L
snapDir.mkdir()
Files.createDirectories(snapDir.toPath)
loaded = true
}
}
@ -350,7 +353,7 @@ class ProducerIdMapping(val config: LogConfig, @@ -350,7 +353,7 @@ class ProducerIdMapping(val config: LogConfig,
// Get file with the smallest offset
val toDelete = list.minBy(offsetFromFile)
// Delete the last
toDelete.delete()
Files.deleteIfExists(toDelete.toPath)
}
}

10
core/src/main/scala/kafka/server/DelayedOperation.scala

@ -127,11 +127,11 @@ object DelayedOperationPurgatory { @@ -127,11 +127,11 @@ object DelayedOperationPurgatory {
/**
* A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
*/
class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true)
final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true)
extends Logging with KafkaMetricsGroup {
/* a list of operation watching keys */

2
core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala

@ -49,7 +49,7 @@ trait LeaderEpochCache { @@ -49,7 +49,7 @@ trait LeaderEpochCache {
*/
class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging {
private val lock = new ReentrantReadWriteLock()
private var epochs: ListBuffer[EpochEntry] = lock.synchronized { ListBuffer(checkpoint.read(): _*) }
private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) }
private var cachedLatestEpoch: Option[Int] = None //epoch which has yet to be assigned to a message.
/**

3
core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala

@ -156,7 +156,7 @@ object ConsumerOffsetChecker extends Logging { @@ -156,7 +156,7 @@ object ConsumerOffsetChecker extends Logging {
topicPidMap = immutable.Map(zkUtils.getPartitionsForTopics(topicList).toSeq:_*)
val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
channel.send(OffsetFetchRequest(group, topicPartitions))
@ -186,6 +186,7 @@ object ConsumerOffsetChecker extends Logging { @@ -186,6 +186,7 @@ object ConsumerOffsetChecker extends Logging {
}
}
channel.disconnect()
channel = null
println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
topicList.sorted.foreach {

8
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -660,13 +660,9 @@ class ZkUtils(val zkClient: ZkClient, @@ -660,13 +660,9 @@ class ZkUtils(val zkClient: ZkClient,
val stat = new Stat()
try {
val data: String = zkClient.readData(path, stat)
if (data == null.asInstanceOf[String])
(None, stat.getVersion)
else
(Some(data), stat.getVersion)
(Option(data), stat.getVersion)
} catch {
case _: ZkNoNodeException =>
(None, stat.getVersion)
case _: ZkNoNodeException => (None, stat.getVersion)
}
}

71
gradle/findbugs-exclude.xml

@ -62,17 +62,38 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc @@ -62,17 +62,38 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE: Redundant nullcheck of value known to be null.
RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT: Return value of method without side effect is ignored.
NM_CLASS_NAMING_CONVENTION: Class names should start with an upper case letter.
NM_METHOD_NAMING_CONVENTION: Method names should start with a lower case letter. -->
NM_METHOD_NAMING_CONVENTION: Method names should start with a lower case letter.
EC_NULL_ARG: Call to equals(null)
NP_ALWAYS_NULL: Null pointer dereference
MS_CANNOT_BE_FINAL: Field isn't final and can't be protected from malicious code -->
<Source name="~.*\.scala" />
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE,NP_NULL_ON_SOME_PATH,NP_NULL_PARAM_DEREF,SE_BAD_FIELD,DM_STRING_CTOR,DM_NEW_FOR_GETCLASS,ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD,DM_NUMBER_CTOR,RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE,RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE,RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE,RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT,NM_CLASS_NAMING_CONVENTION,NM_METHOD_NAMING_CONVENTION"/>
<Or>
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE"/>
<Bug pattern="NP_NULL_ON_SOME_PATH"/>
<Bug pattern="NP_NULL_PARAM_DEREF"/>
<Bug pattern="SE_BAD_FIELD"/>
<Bug pattern="DM_STRING_CTOR"/>
<Bug pattern="DM_NEW_FOR_GETCLASS"/>
<Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
<Bug pattern="DM_NUMBER_CTOR"/>
<Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"/>
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"/>
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
<Bug pattern="NM_CLASS_NAMING_CONVENTION"/>
<Bug pattern="NM_METHOD_NAMING_CONVENTION"/>
<Bug pattern="EC_NULL_ARG"/>
<Bug pattern="NP_ALWAYS_NULL"/>
<Bug pattern="MS_CANNOT_BE_FINAL"/>
</Or>
</Match>
<Match>
<!-- Add a suppression for KAFKA-4897: LogCleaner#cleanSegments should not ignore failures to delete files.
TODO: remove this suppression when KAFKA-4897 is fixed. -->
<Class name="kafka.log.Cleaner"/>
<Method name="cleanSegments"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
<Package name="kafka.log"/>
<Source name="LogCleaner.scala"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED,RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
<Match>
@ -153,6 +174,15 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc @@ -153,6 +174,15 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
<Bug pattern="RV_RETURN_VALUE_IGNORED"/>
</Match>
<Match>
<!-- Suppress a warning about ignoring the return value of await.
This is done intentionally because we use other clues to determine
if the wait was cut short. -->
<Package name="kafka.log"/>
<Source name="LogCleanerManager.scala"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED,RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
<Match>
<!-- Suppress some warnings about intentional switch statement fallthrough. -->
<Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>
@ -251,4 +281,35 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc @@ -251,4 +281,35 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
<Match>
<!-- Ignore a warning about synchronizing on an AtomicBoolean -->
<Package name="kafka.metrics"/>
<Source name="KafkaMetricsReporter.scala"/>
<Method name="startReporters"/>
<Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER"/>
</Match>
<Match>
<!-- Ignore a spurious warning about the types used in
AdminClient#deleteRecordsBefore -->
<Package name="kafka.admin"/>
<Source name="AdminClient.scala"/>
<Bug pattern="GC_UNRELATED_TYPES"/>
</Match>
<Match>
<!-- Suppress a spurious warning about an unreleased lock. -->
<Class name="kafka.utils.timer.SystemTimer"/>
<Method name="add"/>
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH"/>
</Match>
<Match>
<!-- Suppress a warning about an intentional infinite loop. -->
<Package name="kafka.utils"/>
<Source name="Throttler.scala"/>
<Method name="main"/>
<Bug pattern="IL_INFINITE_LOOP"/>
</Match>
</FindBugsFilter>

2
jenkins.sh

@ -17,4 +17,4 @@ @@ -17,4 +17,4 @@
# This script is used for verifying changes in Jenkins. In order to provide faster feedback, the tasks are ordered so
# that faster tasks are executed in every module before slower tasks (if possible). For example, the unit tests for all
# the modules are executed before the integration tests.
./gradlew clean compileJava compileScala compileTestJava compileTestScala checkstyleMain checkstyleTest unitTest integrationTest --no-daemon -Dorg.gradle.project.testLoggingEvents=started,passed,skipped,failed "$@"
./gradlew clean compileJava compileScala compileTestJava compileTestScala checkstyleMain checkstyleTest findbugsMain unitTest integrationTest --no-daemon -Dorg.gradle.project.testLoggingEvents=started,passed,skipped,failed "$@"

Loading…
Cancel
Save