Browse Source

KAFKA-15492: Upgrade and enable spotbugs when building with Java 21 (#14533)

Spotbugs was temporarily disabled as part of KAFKA-15485 to support Kafka build with JDK 21. This PR upgrades the spotbugs version to 4.8.0 which adds support for JDK 21 and enables it's usage on build again.

Reviewers: Divij Vaidya <diviv@amazon.com>
pull/12095/merge
Ismael Juma 11 months ago committed by GitHub
parent
commit
4cf86c5d2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      build.gradle
  2. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
  3. 3
      clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java
  4. 4
      core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
  5. 2
      core/src/main/scala/kafka/server/PartitionMetadataFile.scala
  6. 5
      generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
  7. 2
      gradle/dependencies.gradle
  8. 42
      gradle/spotbugs-exclude.xml
  9. 2
      raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
  10. 5
      raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
  11. 2
      server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
  12. 2
      streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java

18
build.gradle

@ -235,10 +235,7 @@ subprojects {
apply plugin: 'java-library' apply plugin: 'java-library'
apply plugin: 'checkstyle' apply plugin: 'checkstyle'
apply plugin: "com.github.spotbugs"
// spotbugs doesn't support Java 21 yet
if (!JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_21))
apply plugin: "com.github.spotbugs"
// We use the shadow plugin for the jmh-benchmarks module and the `-all` jar can get pretty large, so // We use the shadow plugin for the jmh-benchmarks module and the `-all` jar can get pretty large, so
// don't publish it // don't publish it
@ -708,15 +705,12 @@ subprojects {
test.dependsOn('checkstyleMain', 'checkstyleTest') test.dependsOn('checkstyleMain', 'checkstyleTest')
// spotbugs doesn't support Java 21 yet spotbugs {
if (!JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_21)) { toolVersion = versions.spotbugs
spotbugs { excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
toolVersion = versions.spotbugs ignoreFailures = false
excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
ignoreFailures = false
}
test.dependsOn('spotbugsMain')
} }
test.dependsOn('spotbugsMain')
tasks.withType(com.github.spotbugs.snom.SpotBugsTask) { tasks.withType(com.github.spotbugs.snom.SpotBugsTask) {
reports { reports {

6
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java

@ -52,7 +52,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
private static final Logger log = LoggerFactory.getLogger(AbstractStickyAssignor.class); private static final Logger log = LoggerFactory.getLogger(AbstractStickyAssignor.class);
public static final int DEFAULT_GENERATION = -1; public static final int DEFAULT_GENERATION = -1;
public int maxGeneration = DEFAULT_GENERATION; private int maxGeneration = DEFAULT_GENERATION;
private PartitionMovements partitionMovements; private PartitionMovements partitionMovements;
@ -118,6 +118,10 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
} }
public int maxGeneration() {
return maxGeneration;
}
/** /**
* Returns true iff all consumers have an identical subscription. Also fills out the passed in * Returns true iff all consumers have an identical subscription. Also fills out the passed in
* {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions, * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions,

3
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java

@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.Reader; import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Enumeration; import java.util.Enumeration;
@ -101,7 +102,7 @@ public class FileConfigProvider implements ConfigProvider {
// visible for testing // visible for testing
protected Reader reader(String path) throws IOException { protected Reader reader(String path) throws IOException {
return Files.newBufferedReader(Paths.get(path)); return Files.newBufferedReader(Paths.get(path), StandardCharsets.UTF_8);
} }
public void close() { public void close() {

4
core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala

@ -18,10 +18,10 @@
package kafka.metrics package kafka.metrics
import java.nio.file.{Files, Paths} import java.nio.file.{Files, Paths}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.slf4j.Logger import org.slf4j.Logger
import java.nio.charset.StandardCharsets
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
/** /**
@ -68,7 +68,7 @@ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logg
try { try {
cachedReadBytes = -1 cachedReadBytes = -1
cachedWriteBytes = -1 cachedWriteBytes = -1
val lines = Files.readAllLines(path).asScala val lines = Files.readAllLines(path, StandardCharsets.UTF_8).asScala
lines.foreach(line => { lines.foreach(line => {
if (line.startsWith(READ_BYTES_PREFIX)) { if (line.startsWith(READ_BYTES_PREFIX)) {
cachedReadBytes = line.substring(READ_BYTES_PREFIX.size).toLong cachedReadBytes = line.substring(READ_BYTES_PREFIX.size).toLong

2
core/src/main/scala/kafka/server/PartitionMetadataFile.scala

@ -138,7 +138,7 @@ class PartitionMetadataFile(val file: File,
def read(): PartitionMetadata = { def read(): PartitionMetadata = {
lock synchronized { lock synchronized {
try { try {
val reader = Files.newBufferedReader(path) val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
try { try {
val partitionBuffer = new PartitionMetadataReadBuffer(file.getAbsolutePath, reader) val partitionBuffer = new PartitionMetadataReadBuffer(file.getAbsolutePath, reader)
partitionBuffer.read() partitionBuffer.read()

5
generator/src/main/java/org/apache/kafka/message/MessageGenerator.java

@ -27,6 +27,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream; import java.nio.file.DirectoryStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -238,7 +239,7 @@ public final class MessageGenerator {
String name = generator.outputName(spec) + JAVA_SUFFIX; String name = generator.outputName(spec) + JAVA_SUFFIX;
outputFileNames.add(name); outputFileNames.add(name);
Path outputPath = Paths.get(outputDir, name); Path outputPath = Paths.get(outputDir, name);
try (BufferedWriter writer = Files.newBufferedWriter(outputPath)) { try (BufferedWriter writer = Files.newBufferedWriter(outputPath, StandardCharsets.UTF_8)) {
generator.generateAndWrite(spec, writer); generator.generateAndWrite(spec, writer);
} }
} }
@ -252,7 +253,7 @@ public final class MessageGenerator {
for (TypeClassGenerator typeClassGenerator : typeClassGenerators) { for (TypeClassGenerator typeClassGenerator : typeClassGenerators) {
outputFileNames.add(typeClassGenerator.outputName()); outputFileNames.add(typeClassGenerator.outputName());
Path factoryOutputPath = Paths.get(outputDir, typeClassGenerator.outputName()); Path factoryOutputPath = Paths.get(outputDir, typeClassGenerator.outputName());
try (BufferedWriter writer = Files.newBufferedWriter(factoryOutputPath)) { try (BufferedWriter writer = Files.newBufferedWriter(factoryOutputPath, StandardCharsets.UTF_8)) {
typeClassGenerator.generateAndWrite(writer); typeClassGenerator.generateAndWrite(writer);
} }
} }

2
gradle/dependencies.gradle

@ -161,7 +161,7 @@ versions += [
scoverage: "1.9.3", scoverage: "1.9.3",
slf4j: "1.7.36", slf4j: "1.7.36",
snappy: "1.1.10.5", snappy: "1.1.10.5",
spotbugs: "4.7.3", spotbugs: "4.8.0",
zinc: "1.9.2", zinc: "1.9.2",
zookeeper: "3.8.2", zookeeper: "3.8.2",
zstd: "1.5.5-6" zstd: "1.5.5-6"

42
gradle/spotbugs-exclude.xml

@ -46,6 +46,21 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/> <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
</Match> </Match>
<Match>
<!-- Disable warnings about constructors that throw exceptions.
CT_CONSTRUCTOR_THROW: Be wary of letting constructors throw exceptions -->
<Bug pattern="CT_CONSTRUCTOR_THROW"/>
</Match>
<Match>
<!-- Disable warnings about identifiers that conflict with standard library identifiers.
PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_CLASS_NAMES: Do not reuse public identifiers from JSL as class name
PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_FIELD_NAMES: Do not reuse public identifiers from JSL as field name
PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_METHOD_NAMES: Do not reuse public identifiers from JSL as method name
-->
<Bug pattern="PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_CLASS_NAMES,PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_FIELD_NAMES,PI_DO_NOT_REUSE_PUBLIC_IDENTIFIERS_METHOD_NAMES"/>
</Match>
<Match> <Match>
<!-- Spotbugs tends to work a little bit better with Java than with Scala. We suppress <!-- Spotbugs tends to work a little bit better with Java than with Scala. We suppress
some categories of bug reports when using Scala, since spotbugs generates huge some categories of bug reports when using Scala, since spotbugs generates huge
@ -70,7 +85,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
NP_ALWAYS_NULL: Null pointer dereference NP_ALWAYS_NULL: Null pointer dereference
MS_CANNOT_BE_FINAL: Field isn't final and can't be protected from malicious code MS_CANNOT_BE_FINAL: Field isn't final and can't be protected from malicious code
IC_INIT_CIRCULARITY: Initialization circularity IC_INIT_CIRCULARITY: Initialization circularity
SE_NO_SUITABLE_CONSTRUCTOR: Class is Serializable but its superclass doesn't define a void constructor --> SE_NO_SUITABLE_CONSTRUCTOR: Class is Serializable but its superclass doesn't define a void constructor
PA_PUBLIC_MUTABLE_OBJECT_ATTRIBUTE: Mutable object-type field is public -->
<Source name="~.*\.scala" /> <Source name="~.*\.scala" />
<Or> <Or>
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE"/> <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE"/>
@ -94,9 +110,16 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="SE_NO_SUITABLE_CONSTRUCTOR"/> <Bug pattern="SE_NO_SUITABLE_CONSTRUCTOR"/>
<Bug pattern="DMI_RANDOM_USED_ONLY_ONCE"/> <Bug pattern="DMI_RANDOM_USED_ONLY_ONCE"/>
<Bug pattern="SSD_DO_NOT_USE_INSTANCE_LOCK_ON_SHARED_STATIC_DATA"/> <Bug pattern="SSD_DO_NOT_USE_INSTANCE_LOCK_ON_SHARED_STATIC_DATA"/>
<Bug pattern="PA_PUBLIC_MUTABLE_OBJECT_ATTRIBUTE"/>
</Or> </Or>
</Match> </Match>
<Match>
<!-- disabled due to too many false positives
RV_EXCEPTION_NOT_THROWN: Exception created and dropped rather than thrown -->
<Bug pattern="RV_EXCEPTION_NOT_THROWN"/>
</Match>
<!-- false positive in Java 11, related to https://github.com/spotbugs/spotbugs/issues/756 but more complex --> <!-- false positive in Java 11, related to https://github.com/spotbugs/spotbugs/issues/756 but more complex -->
<Match> <Match>
<Class name="org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream"/> <Class name="org.apache.kafka.common.compress.KafkaLZ4BlockOutputStream"/>
@ -270,6 +293,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Package name="~org\.apache\.kafka\.jmh\..*\.jmh_generated"/> <Package name="~org\.apache\.kafka\.jmh\..*\.jmh_generated"/>
</Match> </Match>
<Match>
<!-- JMH benchmarks often have fields that are modified by the framework.
SS_SHOULD_BE_STATIC: Unread field: should this field be static? -->
<Package name="~org\.apache\.kafka\.jmh\..*"/>
<Bug pattern="SS_SHOULD_BE_STATIC"/>
</Match>
<Match> <Match>
<!-- Suppress warnings about generated schema arrays. --> <!-- Suppress warnings about generated schema arrays. -->
<Or> <Or>
@ -367,6 +397,16 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
</Or> </Or>
</Match> </Match>
<Match>
<!-- Public mutable fields are intentional in some Streams classes.
PA_PUBLIC_PRIMITIVE_ATTRIBUTE: Primitive field is public -->
<Or>
<Class name="org.apache.kafka.streams.kstream.Materialized"/>
<Class name="org.apache.kafka.streams.state.internals.LeftOrRightValueDeserializer"/>
</Or>
<Bug pattern="PA_PUBLIC_PRIMITIVE_ATTRIBUTE"/>
</Match>
<Match> <Match>
<!-- Suppress a spurious warning about locks not being released on all paths. <!-- Suppress a spurious warning about locks not being released on all paths.
This happens because there is an 'if' statement that checks if we have the lock before This happens because there is an 'if' statement that checks if we have the lock before

2
raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java

@ -71,7 +71,7 @@ public class FileBasedStateStore implements QuorumStateStore {
} }
private QuorumStateData readStateFromFile(File file) { private QuorumStateData readStateFromFile(File file) {
try (final BufferedReader reader = Files.newBufferedReader(file.toPath())) { try (final BufferedReader reader = Files.newBufferedReader(file.toPath(), StandardCharsets.UTF_8)) {
final String line = reader.readLine(); final String line = reader.readLine();
if (line == null) { if (line == null) {
throw new EOFException("File ended prematurely."); throw new EOFException("File ended prematurely.");

5
raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java

@ -29,10 +29,11 @@ import java.util.OptionalInt;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
public class ReplicatedCounter implements RaftClient.Listener<Integer> { public class ReplicatedCounter implements RaftClient.Listener<Integer> {
private static final int SNAPSHOT_DELAY_IN_RECORDS = 10;
private final int nodeId; private final int nodeId;
private final Logger log; private final Logger log;
private final RaftClient<Integer> client; private final RaftClient<Integer> client;
private final int snapshotDelayInRecords = 10;
private int committed = 0; private int committed = 0;
private int uncommitted = 0; private int uncommitted = 0;
@ -107,7 +108,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
} }
log.debug("Counter incremented from {} to {}", initialCommitted, committed); log.debug("Counter incremented from {} to {}", initialCommitted, committed);
if (lastOffsetSnapshotted + snapshotDelayInRecords < lastCommittedOffset) { if (lastOffsetSnapshotted + SNAPSHOT_DELAY_IN_RECORDS < lastCommittedOffset) {
log.debug( log.debug(
"Generating new snapshot with committed offset {} and epoch {} since the previous snapshot includes {}", "Generating new snapshot with committed offset {} and epoch {} since the previous snapshot includes {}",
lastCommittedOffset, lastCommittedOffset,

2
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java

@ -89,7 +89,7 @@ public class CheckpointFile<T> {
public List<T> read() throws IOException { public List<T> read() throws IOException {
synchronized (lock) { synchronized (lock) {
try (BufferedReader reader = Files.newBufferedReader(absolutePath)) { try (BufferedReader reader = Files.newBufferedReader(absolutePath, StandardCharsets.UTF_8)) {
CheckpointReadBuffer<T> checkpointBuffer = new CheckpointReadBuffer<>(absolutePath.toString(), reader, version, formatter); CheckpointReadBuffer<T> checkpointBuffer = new CheckpointReadBuffer<>(absolutePath.toString(), reader, version, formatter);
return checkpointBuffer.read(); return checkpointBuffer.read();
} }

2
streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java

@ -148,7 +148,7 @@ public class OffsetCheckpoint {
*/ */
public Map<TopicPartition, Long> read() throws IOException { public Map<TopicPartition, Long> read() throws IOException {
synchronized (lock) { synchronized (lock) {
try (final BufferedReader reader = Files.newBufferedReader(file.toPath())) { try (final BufferedReader reader = Files.newBufferedReader(file.toPath(), StandardCharsets.UTF_8)) {
final int version = readInt(reader); final int version = readInt(reader);
switch (version) { switch (version) {
case 0: case 0:

Loading…
Cancel
Save