From eed5e68880c0d4fe088fcb9493baecc3db667176 Mon Sep 17 00:00:00 2001 From: Josep Prat Date: Fri, 20 Oct 2023 21:04:04 +0200 Subject: [PATCH] MINOR: Server-Commons cleanup (#14572) MINOR: Server-Commons cleanup Fixes Javadoc and minor issues in the Java files of Server-Commons modules. Javadoc is now formatted as intended by the author of the doc itself. Signed-off-by: Josep Prat Reviewers: Mickael Maison --- .../org/apache/kafka/admin/AdminUtils.java | 8 +++--- .../org/apache/kafka/queue/EventQueue.java | 2 +- .../apache/kafka/queue/KafkaEventQueue.java | 2 +- .../kafka/server/common/CheckpointFile.java | 4 +-- .../kafka/server/common/MetadataVersion.java | 4 +-- .../kafka/server/common/ProducerIdsBlock.java | 2 +- .../AbstractApiMessageSerde.java | 5 ++-- .../serialization/BytesApiMessageSerde.java | 4 +-- .../config/ServerTopicConfigSynonyms.java | 6 ++-- .../fault/ProcessTerminatingFaultHandler.java | 6 ++-- .../server/metrics/KafkaYammerMetrics.java | 2 +- .../kafka/server/mutable/BoundedList.java | 4 +-- .../server/network/EndpointReadyFutures.java | 26 +++++++---------- .../kafka/server/util/CommandLineUtils.java | 4 +-- .../apache/kafka/server/util/FutureUtils.java | 13 ++++----- .../org/apache/kafka/server/util/Json.java | 16 +++++------ .../kafka/server/util/KafkaScheduler.java | 2 +- .../apache/kafka/server/util/Scheduler.java | 4 +-- .../server/util/ThroughputThrottler.java | 8 +++--- .../server/util/TranslatedValueMapView.java | 2 +- .../kafka/server/util/json/JsonArray.java | 2 +- .../kafka/server/util/json/JsonValue.java | 20 ++++++------- .../apache/kafka/server/util/timer/Timer.java | 2 +- .../kafka/server/util/timer/TimingWheel.java | 28 +++++++++---------- .../apache/kafka/timeline/BaseHashTable.java | 26 ++++++++--------- .../org/apache/kafka/timeline/Snapshot.java | 2 +- .../kafka/timeline/SnapshotRegistry.java | 2 +- .../timeline/SnapshottableHashTable.java | 24 ++++++++-------- .../kafka/timeline/TimelineHashMap.java | 4 +-- .../kafka/timeline/TimelineHashSet.java | 4 +-- .../kafka/timeline/TimelineInteger.java | 3 +- .../apache/kafka/timeline/TimelineLong.java | 3 +- .../apache/kafka/timeline/TimelineObject.java | 2 +- 33 files changed, 116 insertions(+), 130 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java index 9504954b76e..ad69e161758 100644 --- a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java +++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java @@ -111,7 +111,7 @@ public class AdminUtils { * brokers, it guarantees that the replica distribution is even across brokers and racks. *

* @return a Map from partition id to replica ids - * @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to + * @throws AdminOperationException If rack information is supplied, but it is incomplete, or if it is not possible to * assign each replica to a unique rack. * */ @@ -214,13 +214,13 @@ public class AdminUtils { /** * Given broker and rack information, returns a list of brokers alternated by the rack. Assume * this is the rack and its brokers: - * + *
      * rack1: 0, 1, 2
      * rack2: 3, 4, 5
      * rack3: 6, 7, 8
-     *
+     * 
* This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8 - * + *
* This is essential to make sure that the assignReplicasToBrokers API can use such list and * assign replicas to brokers in a simple round-robin fashion, while ensuring an even * distribution of leader and replica counts on each broker and that replicas are diff --git a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java index 8c4022cee2b..df1fd0987cb 100644 --- a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java +++ b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java @@ -210,7 +210,7 @@ public interface EventQueue extends AutoCloseable { /** * Asynchronously shut down the event queue. - * + *
* No new events will be accepted, and the queue thread will exit after running the existing events. * Deferred events will receive TimeoutExceptions. * diff --git a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java index 2fde8285dab..36284ed7f3e 100644 --- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java +++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java @@ -201,7 +201,7 @@ public final class KafkaEventQueue implements EventQueue { } } - private void handleEvents() throws InterruptedException { + private void handleEvents() { Throwable toDeliver = null; EventContext toRun = null; boolean wasInterrupted = false; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java index 818af8b3c1b..6efbaa136e0 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java @@ -37,13 +37,13 @@ import java.util.Optional; /** * This class represents a utility to capture a checkpoint in a file. It writes down to the file in the below format. - * + *
  * ========= File beginning =========
  * version: int
  * entries-count: int
  * entry-as-string-on-each-line
  * ========= File end ===============
- *
+ * 
* Each entry is represented as a string on each line in the checkpoint file. {@link EntryFormatter} is used * to convert the entry into a string and vice versa. * diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index f7ca2fc242f..8f22ed582a6 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -27,10 +27,10 @@ import org.apache.kafka.common.record.RecordVersion; * This class contains the different Kafka versions. * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves. * This is only for inter-broker communications - when communicating with clients, the client decides on the API version. - * + *
* Note that the ID we initialize for each version is important. * We consider a version newer than another if it is lower in the enum list (to avoid depending on lexicographic order) - * + *
* Since the api protocol may change more than once within the same release and to facilitate people deploying code from * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example, * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a diff --git a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java index c4240018f9b..7c197ce98df 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; /** * Holds a range of Producer IDs used for Transactional and EOS producers. - * + *
* The start and end of the ID block are inclusive. */ public class ProducerIdsBlock { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java index c3914ea47b5..a29b6a00fc0 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java @@ -26,11 +26,10 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; /** * This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for the given {@code apiKey}. - * + *
* This can be used as the underlying serialization mechanism for records defined with {@link ApiMessage}s. - *

+ *

* Serialization format for the given {@code ApiMessageAndVersion} is below: - *

*
  *     [data_frame_version header message]
  *     header => [api_key version]
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java
index 668bbfb2488..93ad9490755 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java
@@ -25,10 +25,10 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import java.nio.ByteBuffer;
 
 /**
- * This class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa.. This can be used as serialization protocol for any
+ * This class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa. This can be used as serialization protocol for any
  * metadata records derived of {@code ApiMessage}s. It internally uses {@link AbstractApiMessageSerde} for serialization/deserialization
  * mechanism.
- * 

+ *

* Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective * {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective * {@code ApiMessage} instance. diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java index c53f8da84f9..320de2db6b9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java @@ -34,16 +34,16 @@ public final class ServerTopicConfigSynonyms { /** * Maps topic configurations to their equivalent broker configurations. - * + *
* Topics can be configured either by setting their dynamic topic configurations, or by * setting equivalent broker configurations. For historical reasons, the equivalent broker * configurations have different names. This table maps each topic configuration to its * equivalent broker configurations. - * + *
* In some cases, the equivalent broker configurations must be transformed before they * can be used. For example, log.roll.hours must be converted to milliseconds before it * can be used as the value of segment.ms. - * + *
* The broker configurations will be used in the order specified here. In other words, if * both the first and the second synonyms are configured, we will use only the value of * the first synonym and ignore the second. diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java index 29ba7b84706..a15c924c98b 100644 --- a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java +++ b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java @@ -66,13 +66,13 @@ final public class ProcessTerminatingFaultHandler implements FaultHandler { /** * Set if halt or exit should be used. - * + *
* When {@code value} is {@code false} {@code Exit.exit} is called, otherwise {@code Exit.halt} is * called. The default value is {@code true}. - * + *
* The default implementation of {@code Exit.exit} calls {@code Runtime.exit} which * blocks on all of the shutdown hooks executing. - * + *
* The default implementation of {@code Exit.halt} calls {@code Runtime.halt} which * forcibly terminates the JVM. */ diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java index 329083350cd..6b2f0a1e38d 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java @@ -38,7 +38,7 @@ import java.util.function.Supplier; /** * This class encapsulates the default yammer metrics registry for Kafka server, * and configures the set of exported JMX metrics for Yammer metrics. - * + *
* KafkaYammerMetrics.defaultRegistry() should always be used instead of Metrics.defaultRegistry() */ public class KafkaYammerMetrics implements Reconfigurable { diff --git a/server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java b/server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java index 9af28d06595..6c5d1ba0d5a 100644 --- a/server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java +++ b/server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java @@ -36,11 +36,11 @@ public class BoundedList implements List { private final List underlying; public static BoundedList newArrayBacked(int maxLength) { - return new BoundedList<>(maxLength, new ArrayList()); + return new BoundedList<>(maxLength, new ArrayList<>()); } public static BoundedList newArrayBacked(int maxLength, int initialCapacity) { - return new BoundedList<>(maxLength, new ArrayList(initialCapacity)); + return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity)); } public BoundedList(int maxLength, List underlying) { diff --git a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java index 1841cba9f19..578079b4bd3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java +++ b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java @@ -71,10 +71,8 @@ public class EndpointReadyFutures { String name, Map> newFutures ) { - newFutures.forEach((endpoint, future) -> { - endpointStages.computeIfAbsent(endpoint, __ -> new ArrayList<>()). - add(new EndpointCompletionStage(name, future)); - }); + newFutures.forEach((endpoint, future) -> endpointStages.computeIfAbsent(endpoint, __ -> new ArrayList<>()). + add(new EndpointCompletionStage(name, future))); return this; } @@ -123,9 +121,7 @@ public class EndpointReadyFutures { addReadinessFutures("authorizerStart", effectiveStartFutures); stages.forEach(stage -> { Map> newReadinessFutures = new HashMap<>(); - info.endpoints().forEach(endpoint -> { - newReadinessFutures.put(endpoint, stage.future); - }); + info.endpoints().forEach(endpoint -> newReadinessFutures.put(endpoint, stage.future)); addReadinessFutures(stage.name, newReadinessFutures); }); return new EndpointReadyFutures(logContext, @@ -200,15 +196,13 @@ public class EndpointReadyFutures { stages.forEach(stage -> stageNames.add(stage.name)); EndpointReadyFuture readyFuture = new EndpointReadyFuture(endpoint, stageNames); newFutures.put(endpoint, readyFuture.future); - stages.forEach(stage -> { - stage.future.whenComplete((__, exception) -> { - if (exception != null) { - readyFuture.failStage(stage.name, exception); - } else { - readyFuture.completeStage(stage.name); - } - }); - }); + stages.forEach(stage -> stage.future.whenComplete((__, exception) -> { + if (exception != null) { + readyFuture.failStage(stage.name, exception); + } else { + readyFuture.completeStage(stage.name); + } + })); }); this.futures = Collections.unmodifiableMap(newFutures); } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java index 419f0e41c28..e51f6cd321f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java @@ -238,9 +238,9 @@ public class CommandLineUtils { try { initializeBootstrapProperties(properties, options.has(bootstrapServer) ? - Optional.of(options.valueOf(bootstrapServer).toString()) : Optional.empty(), + Optional.of(options.valueOf(bootstrapServer)) : Optional.empty(), options.has(bootstrapControllers) ? - Optional.of(options.valueOf(bootstrapControllers).toString()) : Optional.empty()); + Optional.of(options.valueOf(bootstrapControllers)) : Optional.empty()); } catch (InitializeBootstrapException e) { printUsageAndExit(parser, e.getMessage()); } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java index 642179e81d9..f7a385abf08 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java @@ -80,14 +80,11 @@ public class FutureUtils { CompletableFuture sourceFuture, CompletableFuture destinationFuture ) { - sourceFuture.whenComplete(new BiConsumer() { - @Override - public void accept(T val, Throwable throwable) { - if (throwable != null) { - destinationFuture.completeExceptionally(throwable); - } else { - destinationFuture.complete(val); - } + sourceFuture.whenComplete((BiConsumer) (val, throwable) -> { + if (throwable != null) { + destinationFuture.completeExceptionally(throwable); + } else { + destinationFuture.complete(val); } }); } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/Json.java b/server-common/src/main/java/org/apache/kafka/server/util/Json.java index 3a2922838a3..042e3fff29d 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/Json.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/Json.java @@ -30,7 +30,7 @@ import java.util.Optional; * Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation. */ public final class Json { - private static ObjectMapper mapper = new ObjectMapper(); + private static final ObjectMapper MAPPER = new ObjectMapper(); /** * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. @@ -48,7 +48,7 @@ public final class Json { * exception. */ public static T parseStringAs(String input, Class clazz) throws JsonProcessingException { - return mapper.readValue(input, clazz); + return MAPPER.readValue(input, clazz); } /** @@ -56,21 +56,21 @@ public final class Json { */ public static Optional parseBytes(byte[] input) throws IOException { try { - return Optional.ofNullable(mapper.readTree(input)).map(JsonValue::apply); + return Optional.ofNullable(MAPPER.readTree(input)).map(JsonValue::apply); } catch (JsonProcessingException e) { return Optional.empty(); } } public static JsonValue tryParseBytes(byte[] input) throws IOException { - return JsonValue.apply(mapper.readTree(input)); + return JsonValue.apply(MAPPER.readTree(input)); } /** * Parse a JSON byte array into a generic type T, or throws a JsonProcessingException in the case of exception. */ public static T parseBytesAs(byte[] input, Class clazz) throws IOException { - return mapper.readValue(input, clazz); + return MAPPER.readValue(input, clazz); } /** @@ -83,7 +83,7 @@ public final class Json { if (input == null || input.isEmpty()) { throw new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty"); } else { - return JsonValue.apply(mapper.readTree(input)); + return JsonValue.apply(MAPPER.readTree(input)); } } @@ -93,7 +93,7 @@ public final class Json { * a jackson-scala dependency). */ public static String encodeAsString(Object obj) throws JsonProcessingException { - return mapper.writeValueAsString(obj); + return MAPPER.writeValueAsString(obj); } /** @@ -102,6 +102,6 @@ public final class Json { * a jackson-scala dependency). */ public static byte[] encodeAsBytes(Object obj) throws JsonProcessingException { - return mapper.writeValueAsBytes(obj); + return MAPPER.writeValueAsBytes(obj); } } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java b/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java index 7115ed2f2ab..f451795f85f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor - * + *
* It has a pool of kafka-scheduler- threads that do the actual work. */ public class KafkaScheduler implements Scheduler { diff --git a/server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java b/server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java index fde3a979140..5df88960cde 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java @@ -20,14 +20,14 @@ import java.util.concurrent.ScheduledFuture; /** * A scheduler for running jobs - * + *
* This interface controls a job scheduler that allows scheduling either repeating background jobs * that execute periodically or delayed one-time actions that are scheduled in the future. */ public interface Scheduler { /** - * Initialize this scheduler so it is ready to accept scheduling of tasks + * Initialize this scheduler, so it is ready to accept scheduling of tasks */ void startup(); diff --git a/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java b/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java index d826ddda85e..e6a5e6f1290 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java @@ -19,11 +19,11 @@ package org.apache.kafka.server.util; /** * This class helps producers throttle throughput. - * + *
* If targetThroughput >= 0, the resulting average throughput will be approximately * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0, * no throttling will occur. - * + *
* To use, do this between successive send attempts: *
  *     {@code
@@ -64,7 +64,7 @@ public class ThroughputThrottler {
      * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
      *                    messages produced so far if you want to throttle message throughput.
      * @param sendStartMs timestamp of the most recently sent message
-     * @return
+     * @return true if throttling should happen
      */
     public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
         if (this.targetThroughput < 0) {
@@ -78,7 +78,7 @@ public class ThroughputThrottler {
 
     /**
      * Occasionally blocks for small amounts of time to achieve targetThroughput.
-     *
+     * 
* Note that if targetThroughput is 0, this will block extremely aggressively. */ public void throttle() { diff --git a/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java b/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java index 9c85f6caf46..d269550ab83 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java @@ -29,7 +29,7 @@ import java.util.function.Function; /** * A map which presents a lightweight view of another "underlying" map. Values in the * underlying map will be translated by a callback before they are returned. - * + *
* This class is not internally synchronized. (Typically the underlyingMap is treated as * immutable.) */ diff --git a/server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java b/server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java index 1f1c0287eec..9509a3bfcd9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java @@ -43,7 +43,7 @@ public class JsonArray implements JsonValue { Stream nodeStream = StreamSupport.stream( Spliterators.spliteratorUnknownSize(node.elements(), Spliterator.ORDERED), false); - Stream results = nodeStream.map(node -> JsonValue.apply(node)); + Stream results = nodeStream.map(JsonValue::apply); return results.collect(Collectors.toList()).iterator(); } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java b/server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java index daf3b4c7d61..cd89d0d6063 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java @@ -26,18 +26,16 @@ import java.util.Optional; /** * A simple wrapper over Jackson's JsonNode that enables type safe parsing via the `DecodeJson` type * class. - * + *
* Typical usage would be something like: - * - * {{{ - * val jsonNode: JsonNode = ??? - * val jsonObject = JsonValue(jsonNode).asJsonObject - * val intValue = jsonObject("int_field").to[Int] - * val optionLongValue = jsonObject("option_long_field").to[Option[Long]] - * val mapStringIntField = jsonObject("map_string_int_field").to[Map[String, Int]] - * val seqStringField = jsonObject("seq_string_field").to[Seq[String] - * }}} - * + *

+ * // Given a jsonNode containing a parsed JSON:
+ * JsonObject jsonObject = JsonValue.apply(jsonNode).asJsonObject();
+ * Integer intField = jsonObject.apply("int_field").to(new DecodeJson.DecodeInteger());
+ * Optional optionLongField = jsonObject.apply("option_long_field").to(DecodeJson.decodeOptional(new DecodeJson.DecodeInteger()));
+ * Map mapStringIntField = jsonObject.apply("map_string_int_field").to(DecodeJson.decodeMap(new DecodeJson.DecodeInteger()));
+ * List seqStringField = jsonObject.apply("seq_string_field").to(DecodeJson.decodeList(new DecodeJson.DecodeString()));
+ * 
* The `to` method throws an exception if the value cannot be converted to the requested type. */ diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java index 2771f34a7cd..b3c500f565c 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java @@ -27,7 +27,7 @@ public interface Timer extends AutoCloseable { /** * Advance the internal clock, executing any tasks whose expiration has been * reached within the duration of the passed timeout. - * @param timeoutMs + * @param timeoutMs the time to advance in milliseconds * @return whether or not any tasks were executed */ boolean advanceClock(long timeoutMs) throws InterruptedException; diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java index 59ba67cfa07..43fae38f65e 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java @@ -21,11 +21,11 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Hierarchical Timing Wheels - * + *
* A simple timing wheel is a circular list of buckets of timer tasks. Let u be the time unit. * A timing wheel with size n has n buckets and can hold timer tasks in n * u time interval. * Each bucket holds timer tasks that fall into the corresponding time range. At the beginning, - * the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), …, + * the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), …, * the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the timer ticks and * moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task * into the bucket for the current time since it is already expired. The timer immediately runs @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; * A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue * based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, have O(log n) * insert/delete cost. - * + *
* A major drawback of a simple timing wheel is that it assumes that a timer request is within * the time interval of n * u from the current time. If a timer request is out of this interval, * it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically @@ -47,50 +47,50 @@ import java.util.concurrent.atomic.AtomicInteger; * are then moved to the finer grain wheels or be executed. The insert (start-timer) cost is O(m) * where m is the number of wheels, which is usually very small compared to the number of requests * in the system, and the delete (stop-timer) cost is still O(1). - * + *
* Example * Let's say that u is 1 and n is 3. If the start time is c, * then the buckets at different levels are: - * + *
  * level    buckets
  * 1        [c,c]   [c+1,c+1]  [c+2,c+2]
  * 2        [c,c+2] [c+3,c+5]  [c+6,c+8]
  * 3        [c,c+8] [c+9,c+17] [c+18,c+26]
- *
+ * 
* The bucket expiration is at the time of bucket beginning. * So at time = c+1, buckets [c,c], [c,c+2] and [c,c+8] are expired. * Level 1's clock moves to c+1, and [c+3,c+3] is created. * Level 2 and level3's clock stay at c since their clocks move in unit of 3 and 9, respectively. * So, no new buckets are created in level 2 and 3. - * + *
* Note that bucket [c,c+2] in level 2 won't receive any task since that range is already covered in level 1. * The same is true for the bucket [c,c+8] in level 3 since its range is covered in level 2. * This is a bit wasteful, but simplifies the implementation. - * + *
  * 1        [c+1,c+1]  [c+2,c+2]  [c+3,c+3]
  * 2        [c,c+2]    [c+3,c+5]  [c+6,c+8]
  * 3        [c,c+8]    [c+9,c+17] [c+18,c+26]
- *
+ * 
* At time = c+2, [c+1,c+1] is newly expired. * Level 1 moves to c+2, and [c+4,c+4] is created, - * + *
  * 1        [c+2,c+2]  [c+3,c+3]  [c+4,c+4]
  * 2        [c,c+2]    [c+3,c+5]  [c+6,c+8]
  * 3        [c,c+8]    [c+9,c+17] [c+18,c+26]
- *
+ * 
* At time = c+3, [c+2,c+2] is newly expired. * Level 2 moves to c+3, and [c+5,c+5] and [c+9,c+11] are created. * Level 3 stay at c. - * + *
  * 1        [c+3,c+3]  [c+4,c+4]  [c+5,c+5]
  * 2        [c+3,c+5]  [c+6,c+8]  [c+9,c+11]
  * 3        [c,c+8]    [c+9,c+17] [c+18,c+26]
- *
+ * 
* The hierarchical timing wheels works especially well when operations are completed before they time out. * Even when everything times out, it still has advantageous when there are many items in the timer. * Its insert cost (including reinsert) and delete cost are O(m) and O(1), respectively while priority * queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue. - * + *
* This class is not thread-safe. There should not be any add calls while advanceClock is executing. * It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe. */ diff --git a/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java index 05315466519..783d7e10643 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java @@ -22,14 +22,14 @@ import java.util.List; /** * A hash table which uses separate chaining. - * + *
* In order to optimize memory consumption a bit, the common case where there is * one element per slot is handled by simply placing the element in the slot, * and the case where there are multiple elements is handled by creating an - * array and putting that in the slot. Java is storing type info in memory + * array and putting that in the slot. Java is storing type info in memory * about every object whether we want it or not, so let's get some benefit * out of it. - * + *
* Arrays and null values cannot be inserted. */ @SuppressWarnings("unchecked") @@ -58,7 +58,7 @@ class BaseHashTable { /** * Calculate the capacity we should provision, given the expected size. - * + *
* Our capacity must always be a power of 2, and never less than 2 or more * than MAX_CAPACITY. We use 64-bit numbers here to avoid overflow * concerns. @@ -180,7 +180,7 @@ class BaseHashTable { /** * Expand the hash table to a new size. Existing elements will be copied to new slots. */ - final private void rehash(int newSize) { + private void rehash(int newSize) { Object[] prevElements = elements; elements = new Object[newSize]; List ready = new ArrayList<>(); @@ -224,15 +224,15 @@ class BaseHashTable { */ static void unpackSlot(List out, Object[] elements, int slot) { Object value = elements[slot]; - if (value == null) { - return; - } else if (value instanceof Object[]) { - Object[] array = (Object[]) value; - for (Object object : array) { - out.add((T) object); + if (value != null) { + if (value instanceof Object[]) { + Object[] array = (Object[]) value; + for (Object object : array) { + out.add((T) object); + } + } else { + out.add((T) value); } - } else { - out.add((T) value); } } diff --git a/server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java b/server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java index 8efa61fff03..7e16a42ce93 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java @@ -22,7 +22,7 @@ import java.util.Map; /** * A snapshot of some timeline data structures. - * + *
* The snapshot contains historical data for several timeline data structures. * We use an IdentityHashMap to store this data. This way, we can easily drop all of * the snapshot data. diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java index 970c53bb727..56462c3aff5 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java @@ -182,7 +182,7 @@ public class SnapshotRegistry { /** * Creates a new snapshot at the given epoch. - * + *
* If {@code epoch} already exists and it is the last snapshot then just return that snapshot. * * @param epoch The epoch to create the snapshot at. The current epoch diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java index 2977e061643..26f8f245964 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java @@ -29,48 +29,48 @@ import java.util.NoSuchElementException; * We handle divergences between the current state and historical state by copying a * reference to elements that have been deleted or overwritten into the most recent * snapshot tier. - * + *
* Note that there are no keys in SnapshottableHashTable, only values. So it more similar * to a hash set than a hash map. The subclasses implement full-featured maps and sets * using this class as a building block. - * + *
* Each snapshot tier contains a size and a hash table. The size reflects the size at * the time the snapshot was taken. Note that, as an optimization, snapshot tiers will * be null if they don't contain anything. So for example, if snapshot 20 of Object O * contains the same entries as snapshot 10 of that object, the snapshot 20 tier for * object O will be null. - * + *
* The current tier's data is stored in the fields inherited from BaseHashTable. It * would be conceptually simpler to have a separate BaseHashTable object, but since Java * doesn't have value types, subclassing is the only way to avoid another pointer * indirection and the associated extra memory cost. - * + *
* Note that each element in the hash table contains a start epoch, and a value. The * start epoch is there to identify when the object was first inserted. This in turn * determines which snapshots it is a member of. - * + *
* In order to retrieve an object from snapshot E, we start by checking to see if the * object exists in the "current" hash tier. If it does, and its startEpoch extends back * to E, we return that object. Otherwise, we check all the snapshot tiers, starting * with E, and ending with the most recent snapshot, to see if the object is there. * As an optimization, if we encounter the object in a snapshot tier but its epoch is too * new, we know that its value at epoch E must be null, so we can return that immediately. - * + *
* The class hierarchy looks like this: - * + *
  *        Revertable       BaseHashTable
  *              ↑              ↑
  *           SnapshottableHashTable → SnapshotRegistry → Snapshot
  *               ↑             ↑
  *   TimelineHashSet       TimelineHashMap
- *
+ * 
* BaseHashTable is a simple hash table that uses separate chaining. The interface is * pretty bare-bones since this class is not intended to be used directly by end-users. - * + *
* This class, SnapshottableHashTable, has the logic for snapshotting and iterating over * snapshots. This is the core of the snapshotted hash table code and handles the * tiering. - * + *
* TimelineHashSet and TimelineHashMap are mostly wrappers around this * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, * respectively. There's a fair amount of boilerplate for this, but it's necessary so @@ -78,11 +78,11 @@ import java.util.NoSuchElementException; * The accessor APIs have two versions -- one that looks at the current state, and one * that looks at a historical snapshotted state. Mutation APIs only ever mutate the * current state. - * + *
* One very important feature of SnapshottableHashTable is that we support iterating * over a snapshot even while changes are being made to the current state. See the * Javadoc for the iterator for more information about how this is accomplished. - * + *
* All of these classes require external synchronization, and don't support null keys or * values. */ diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java index 857ff87cdce..8239b1410af 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java @@ -27,9 +27,9 @@ import java.util.Set; /** * This is a hash map which can be snapshotted. - * + *
* See {@SnapshottableHashTable} for more details about the implementation. - * + *
* This class requires external synchronization. Null keys and values are not supported. * * @param The key type of the set. diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java index 34efb10fdf9..24705a4dffc 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java @@ -24,9 +24,9 @@ import java.util.Set; /** * This is a hash set which can be snapshotted. - * + *
* See {@SnapshottableHashTable} for more details about the implementation. - * + *
* This class requires external synchronization. Null values are not supported. * * @param The value type of the set. diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java index 7e98ca55e78..c4d7298a22d 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java @@ -22,7 +22,7 @@ import java.util.Iterator; /** * This is a mutable integer which can be snapshotted. - * + *
* This class requires external synchronization. */ public class TimelineInteger implements Revertable { @@ -93,7 +93,6 @@ public class TimelineInteger implements Revertable { set(get() - 1); } - @SuppressWarnings("unchecked") @Override public void executeRevert(long targetEpoch, Delta delta) { IntegerContainer container = (IntegerContainer) delta; diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java index 1379b084004..b882d6455f1 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java @@ -22,7 +22,7 @@ import java.util.Iterator; /** * This is a mutable long which can be snapshotted. - * + *
* This class requires external synchronization. */ public class TimelineLong implements Revertable { @@ -93,7 +93,6 @@ public class TimelineLong implements Revertable { set(get() - 1L); } - @SuppressWarnings("unchecked") @Override public void executeRevert(long targetEpoch, Delta delta) { LongContainer container = (LongContainer) delta; diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java index 0b4a43a249a..f41343c9b3b 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java @@ -23,7 +23,7 @@ import java.util.Objects; /** * This is a mutable reference to an immutable object. It can be snapshotted. - * + *
* This class requires external synchronization. */ public class TimelineObject implements Revertable {