Browse Source

MINOR: Server-Commons cleanup (#14572)

MINOR: Server-Commons cleanup

Fixes Javadoc and minor issues in the Java files of Server-Commons modules.

Javadoc is now formatted as intended by the author of the doc itself.

Signed-off-by: Josep Prat <josep.prat@aiven.io>

Reviewers: Mickael Maison <mickael.maison@gmail.com>
pull/6542/merge
Josep Prat 11 months ago committed by GitHub
parent
commit
eed5e68880
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
  2. 2
      server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
  3. 2
      server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
  4. 4
      server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
  5. 4
      server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
  6. 2
      server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
  7. 5
      server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
  8. 4
      server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java
  9. 6
      server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
  10. 6
      server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java
  11. 2
      server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java
  12. 4
      server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java
  13. 26
      server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
  14. 4
      server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
  15. 13
      server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
  16. 16
      server-common/src/main/java/org/apache/kafka/server/util/Json.java
  17. 2
      server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java
  18. 4
      server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java
  19. 8
      server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java
  20. 2
      server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java
  21. 2
      server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java
  22. 20
      server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java
  23. 2
      server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java
  24. 28
      server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java
  25. 26
      server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java
  26. 2
      server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java
  27. 2
      server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
  28. 24
      server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
  29. 4
      server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
  30. 4
      server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
  31. 3
      server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
  32. 3
      server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
  33. 2
      server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java

8
server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java

@ -111,7 +111,7 @@ public class AdminUtils {
* brokers, it guarantees that the replica distribution is even across brokers and racks. * brokers, it guarantees that the replica distribution is even across brokers and racks.
* </p> * </p>
* @return a Map from partition id to replica ids * @return a Map from partition id to replica ids
* @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to * @throws AdminOperationException If rack information is supplied, but it is incomplete, or if it is not possible to
* assign each replica to a unique rack. * assign each replica to a unique rack.
* *
*/ */
@ -214,13 +214,13 @@ public class AdminUtils {
/** /**
* Given broker and rack information, returns a list of brokers alternated by the rack. Assume * Given broker and rack information, returns a list of brokers alternated by the rack. Assume
* this is the rack and its brokers: * this is the rack and its brokers:
* * <pre>
* rack1: 0, 1, 2 * rack1: 0, 1, 2
* rack2: 3, 4, 5 * rack2: 3, 4, 5
* rack3: 6, 7, 8 * rack3: 6, 7, 8
* * </pre>
* This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8 * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
* * <br>
* This is essential to make sure that the assignReplicasToBrokers API can use such list and * This is essential to make sure that the assignReplicasToBrokers API can use such list and
* assign replicas to brokers in a simple round-robin fashion, while ensuring an even * assign replicas to brokers in a simple round-robin fashion, while ensuring an even
* distribution of leader and replica counts on each broker and that replicas are * distribution of leader and replica counts on each broker and that replicas are

2
server-common/src/main/java/org/apache/kafka/queue/EventQueue.java

@ -210,7 +210,7 @@ public interface EventQueue extends AutoCloseable {
/** /**
* Asynchronously shut down the event queue. * Asynchronously shut down the event queue.
* * <br>
* No new events will be accepted, and the queue thread will exit after running the existing events. * No new events will be accepted, and the queue thread will exit after running the existing events.
* Deferred events will receive TimeoutExceptions. * Deferred events will receive TimeoutExceptions.
* *

2
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java

@ -201,7 +201,7 @@ public final class KafkaEventQueue implements EventQueue {
} }
} }
private void handleEvents() throws InterruptedException { private void handleEvents() {
Throwable toDeliver = null; Throwable toDeliver = null;
EventContext toRun = null; EventContext toRun = null;
boolean wasInterrupted = false; boolean wasInterrupted = false;

4
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java

@ -37,13 +37,13 @@ import java.util.Optional;
/** /**
* This class represents a utility to capture a checkpoint in a file. It writes down to the file in the below format. * This class represents a utility to capture a checkpoint in a file. It writes down to the file in the below format.
* * <pre>
* ========= File beginning ========= * ========= File beginning =========
* version: int * version: int
* entries-count: int * entries-count: int
* entry-as-string-on-each-line * entry-as-string-on-each-line
* ========= File end =============== * ========= File end ===============
* * </pre>
* Each entry is represented as a string on each line in the checkpoint file. {@link EntryFormatter} is used * Each entry is represented as a string on each line in the checkpoint file. {@link EntryFormatter} is used
* to convert the entry into a string and vice versa. * to convert the entry into a string and vice versa.
* *

4
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

@ -27,10 +27,10 @@ import org.apache.kafka.common.record.RecordVersion;
* This class contains the different Kafka versions. * This class contains the different Kafka versions.
* Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves. * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves.
* This is only for inter-broker communications - when communicating with clients, the client decides on the API version. * This is only for inter-broker communications - when communicating with clients, the client decides on the API version.
* * <br>
* Note that the ID we initialize for each version is important. * Note that the ID we initialize for each version is important.
* We consider a version newer than another if it is lower in the enum list (to avoid depending on lexicographic order) * We consider a version newer than another if it is lower in the enum list (to avoid depending on lexicographic order)
* * <br>
* Since the api protocol may change more than once within the same release and to facilitate people deploying code from * Since the api protocol may change more than once within the same release and to facilitate people deploying code from
* trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example, * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example,
* the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a

2
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java

@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
/** /**
* Holds a range of Producer IDs used for Transactional and EOS producers. * Holds a range of Producer IDs used for Transactional and EOS producers.
* * <br>
* The start and end of the ID block are inclusive. * The start and end of the ID block are inclusive.
*/ */
public class ProducerIdsBlock { public class ProducerIdsBlock {

5
server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java

@ -26,11 +26,10 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
/** /**
* This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement * This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement
* {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for the given {@code apiKey}. * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for the given {@code apiKey}.
* * <br>
* This can be used as the underlying serialization mechanism for records defined with {@link ApiMessage}s. * This can be used as the underlying serialization mechanism for records defined with {@link ApiMessage}s.
* <p></p> * <br><br>
* Serialization format for the given {@code ApiMessageAndVersion} is below: * Serialization format for the given {@code ApiMessageAndVersion} is below:
* <p></p>
* <pre> * <pre>
* [data_frame_version header message] * [data_frame_version header message]
* header =&gt; [api_key version] * header =&gt; [api_key version]

4
server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java

@ -25,10 +25,10 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* This class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa.. This can be used as serialization protocol for any * This class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa. This can be used as serialization protocol for any
* metadata records derived of {@code ApiMessage}s. It internally uses {@link AbstractApiMessageSerde} for serialization/deserialization * metadata records derived of {@code ApiMessage}s. It internally uses {@link AbstractApiMessageSerde} for serialization/deserialization
* mechanism. * mechanism.
* <p></p> * <br><br>
* Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective * Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective
* {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective * {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective
* {@code ApiMessage} instance. * {@code ApiMessage} instance.

6
server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java

@ -34,16 +34,16 @@ public final class ServerTopicConfigSynonyms {
/** /**
* Maps topic configurations to their equivalent broker configurations. * Maps topic configurations to their equivalent broker configurations.
* * <br>
* Topics can be configured either by setting their dynamic topic configurations, or by * Topics can be configured either by setting their dynamic topic configurations, or by
* setting equivalent broker configurations. For historical reasons, the equivalent broker * setting equivalent broker configurations. For historical reasons, the equivalent broker
* configurations have different names. This table maps each topic configuration to its * configurations have different names. This table maps each topic configuration to its
* equivalent broker configurations. * equivalent broker configurations.
* * <br>
* In some cases, the equivalent broker configurations must be transformed before they * In some cases, the equivalent broker configurations must be transformed before they
* can be used. For example, log.roll.hours must be converted to milliseconds before it * can be used. For example, log.roll.hours must be converted to milliseconds before it
* can be used as the value of segment.ms. * can be used as the value of segment.ms.
* * <br>
* The broker configurations will be used in the order specified here. In other words, if * The broker configurations will be used in the order specified here. In other words, if
* both the first and the second synonyms are configured, we will use only the value of * both the first and the second synonyms are configured, we will use only the value of
* the first synonym and ignore the second. * the first synonym and ignore the second.

6
server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java

@ -66,13 +66,13 @@ final public class ProcessTerminatingFaultHandler implements FaultHandler {
/** /**
* Set if halt or exit should be used. * Set if halt or exit should be used.
* * <br>
* When {@code value} is {@code false} {@code Exit.exit} is called, otherwise {@code Exit.halt} is * When {@code value} is {@code false} {@code Exit.exit} is called, otherwise {@code Exit.halt} is
* called. The default value is {@code true}. * called. The default value is {@code true}.
* * <br>
* The default implementation of {@code Exit.exit} calls {@code Runtime.exit} which * The default implementation of {@code Exit.exit} calls {@code Runtime.exit} which
* blocks on all of the shutdown hooks executing. * blocks on all of the shutdown hooks executing.
* * <br>
* The default implementation of {@code Exit.halt} calls {@code Runtime.halt} which * The default implementation of {@code Exit.halt} calls {@code Runtime.halt} which
* forcibly terminates the JVM. * forcibly terminates the JVM.
*/ */

2
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java

@ -38,7 +38,7 @@ import java.util.function.Supplier;
/** /**
* This class encapsulates the default yammer metrics registry for Kafka server, * This class encapsulates the default yammer metrics registry for Kafka server,
* and configures the set of exported JMX metrics for Yammer metrics. * and configures the set of exported JMX metrics for Yammer metrics.
* * <br>
* KafkaYammerMetrics.defaultRegistry() should always be used instead of Metrics.defaultRegistry() * KafkaYammerMetrics.defaultRegistry() should always be used instead of Metrics.defaultRegistry()
*/ */
public class KafkaYammerMetrics implements Reconfigurable { public class KafkaYammerMetrics implements Reconfigurable {

4
server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java

@ -36,11 +36,11 @@ public class BoundedList<E> implements List<E> {
private final List<E> underlying; private final List<E> underlying;
public static <E> BoundedList<E> newArrayBacked(int maxLength) { public static <E> BoundedList<E> newArrayBacked(int maxLength) {
return new BoundedList<>(maxLength, new ArrayList<E>()); return new BoundedList<>(maxLength, new ArrayList<>());
} }
public static <E> BoundedList<E> newArrayBacked(int maxLength, int initialCapacity) { public static <E> BoundedList<E> newArrayBacked(int maxLength, int initialCapacity) {
return new BoundedList<>(maxLength, new ArrayList<E>(initialCapacity)); return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity));
} }
public BoundedList(int maxLength, List<E> underlying) { public BoundedList(int maxLength, List<E> underlying) {

26
server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java

@ -71,10 +71,8 @@ public class EndpointReadyFutures {
String name, String name,
Map<Endpoint, ? extends CompletionStage<?>> newFutures Map<Endpoint, ? extends CompletionStage<?>> newFutures
) { ) {
newFutures.forEach((endpoint, future) -> { newFutures.forEach((endpoint, future) -> endpointStages.computeIfAbsent(endpoint, __ -> new ArrayList<>()).
endpointStages.computeIfAbsent(endpoint, __ -> new ArrayList<>()). add(new EndpointCompletionStage(name, future)));
add(new EndpointCompletionStage(name, future));
});
return this; return this;
} }
@ -123,9 +121,7 @@ public class EndpointReadyFutures {
addReadinessFutures("authorizerStart", effectiveStartFutures); addReadinessFutures("authorizerStart", effectiveStartFutures);
stages.forEach(stage -> { stages.forEach(stage -> {
Map<Endpoint, CompletionStage<?>> newReadinessFutures = new HashMap<>(); Map<Endpoint, CompletionStage<?>> newReadinessFutures = new HashMap<>();
info.endpoints().forEach(endpoint -> { info.endpoints().forEach(endpoint -> newReadinessFutures.put(endpoint, stage.future));
newReadinessFutures.put(endpoint, stage.future);
});
addReadinessFutures(stage.name, newReadinessFutures); addReadinessFutures(stage.name, newReadinessFutures);
}); });
return new EndpointReadyFutures(logContext, return new EndpointReadyFutures(logContext,
@ -200,15 +196,13 @@ public class EndpointReadyFutures {
stages.forEach(stage -> stageNames.add(stage.name)); stages.forEach(stage -> stageNames.add(stage.name));
EndpointReadyFuture readyFuture = new EndpointReadyFuture(endpoint, stageNames); EndpointReadyFuture readyFuture = new EndpointReadyFuture(endpoint, stageNames);
newFutures.put(endpoint, readyFuture.future); newFutures.put(endpoint, readyFuture.future);
stages.forEach(stage -> { stages.forEach(stage -> stage.future.whenComplete((__, exception) -> {
stage.future.whenComplete((__, exception) -> { if (exception != null) {
if (exception != null) { readyFuture.failStage(stage.name, exception);
readyFuture.failStage(stage.name, exception); } else {
} else { readyFuture.completeStage(stage.name);
readyFuture.completeStage(stage.name); }
} }));
});
});
}); });
this.futures = Collections.unmodifiableMap(newFutures); this.futures = Collections.unmodifiableMap(newFutures);
} }

4
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java

@ -238,9 +238,9 @@ public class CommandLineUtils {
try { try {
initializeBootstrapProperties(properties, initializeBootstrapProperties(properties,
options.has(bootstrapServer) ? options.has(bootstrapServer) ?
Optional.of(options.valueOf(bootstrapServer).toString()) : Optional.empty(), Optional.of(options.valueOf(bootstrapServer)) : Optional.empty(),
options.has(bootstrapControllers) ? options.has(bootstrapControllers) ?
Optional.of(options.valueOf(bootstrapControllers).toString()) : Optional.empty()); Optional.of(options.valueOf(bootstrapControllers)) : Optional.empty());
} catch (InitializeBootstrapException e) { } catch (InitializeBootstrapException e) {
printUsageAndExit(parser, e.getMessage()); printUsageAndExit(parser, e.getMessage());
} }

13
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java

@ -80,14 +80,11 @@ public class FutureUtils {
CompletableFuture<? extends T> sourceFuture, CompletableFuture<? extends T> sourceFuture,
CompletableFuture<T> destinationFuture CompletableFuture<T> destinationFuture
) { ) {
sourceFuture.whenComplete(new BiConsumer<T, Throwable>() { sourceFuture.whenComplete((BiConsumer<T, Throwable>) (val, throwable) -> {
@Override if (throwable != null) {
public void accept(T val, Throwable throwable) { destinationFuture.completeExceptionally(throwable);
if (throwable != null) { } else {
destinationFuture.completeExceptionally(throwable); destinationFuture.complete(val);
} else {
destinationFuture.complete(val);
}
} }
}); });
} }

16
server-common/src/main/java/org/apache/kafka/server/util/Json.java

@ -30,7 +30,7 @@ import java.util.Optional;
* Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation. * Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation.
*/ */
public final class Json { public final class Json {
private static ObjectMapper mapper = new ObjectMapper(); private static final ObjectMapper MAPPER = new ObjectMapper();
/** /**
* Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON. * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
@ -48,7 +48,7 @@ public final class Json {
* exception. * exception.
*/ */
public static <T> T parseStringAs(String input, Class<T> clazz) throws JsonProcessingException { public static <T> T parseStringAs(String input, Class<T> clazz) throws JsonProcessingException {
return mapper.readValue(input, clazz); return MAPPER.readValue(input, clazz);
} }
/** /**
@ -56,21 +56,21 @@ public final class Json {
*/ */
public static Optional<JsonValue> parseBytes(byte[] input) throws IOException { public static Optional<JsonValue> parseBytes(byte[] input) throws IOException {
try { try {
return Optional.ofNullable(mapper.readTree(input)).map(JsonValue::apply); return Optional.ofNullable(MAPPER.readTree(input)).map(JsonValue::apply);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
return Optional.empty(); return Optional.empty();
} }
} }
public static JsonValue tryParseBytes(byte[] input) throws IOException { public static JsonValue tryParseBytes(byte[] input) throws IOException {
return JsonValue.apply(mapper.readTree(input)); return JsonValue.apply(MAPPER.readTree(input));
} }
/** /**
* Parse a JSON byte array into a generic type T, or throws a JsonProcessingException in the case of exception. * Parse a JSON byte array into a generic type T, or throws a JsonProcessingException in the case of exception.
*/ */
public static <T> T parseBytesAs(byte[] input, Class<T> clazz) throws IOException { public static <T> T parseBytesAs(byte[] input, Class<T> clazz) throws IOException {
return mapper.readValue(input, clazz); return MAPPER.readValue(input, clazz);
} }
/** /**
@ -83,7 +83,7 @@ public final class Json {
if (input == null || input.isEmpty()) { if (input == null || input.isEmpty()) {
throw new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty"); throw new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty");
} else { } else {
return JsonValue.apply(mapper.readTree(input)); return JsonValue.apply(MAPPER.readTree(input));
} }
} }
@ -93,7 +93,7 @@ public final class Json {
* a jackson-scala dependency). * a jackson-scala dependency).
*/ */
public static String encodeAsString(Object obj) throws JsonProcessingException { public static String encodeAsString(Object obj) throws JsonProcessingException {
return mapper.writeValueAsString(obj); return MAPPER.writeValueAsString(obj);
} }
/** /**
@ -102,6 +102,6 @@ public final class Json {
* a jackson-scala dependency). * a jackson-scala dependency).
*/ */
public static byte[] encodeAsBytes(Object obj) throws JsonProcessingException { public static byte[] encodeAsBytes(Object obj) throws JsonProcessingException {
return mapper.writeValueAsBytes(obj); return MAPPER.writeValueAsBytes(obj);
} }
} }

2
server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java

@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
* A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
* * <br>
* It has a pool of kafka-scheduler- threads that do the actual work. * It has a pool of kafka-scheduler- threads that do the actual work.
*/ */
public class KafkaScheduler implements Scheduler { public class KafkaScheduler implements Scheduler {

4
server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java

@ -20,14 +20,14 @@ import java.util.concurrent.ScheduledFuture;
/** /**
* A scheduler for running jobs * A scheduler for running jobs
* * <br>
* This interface controls a job scheduler that allows scheduling either repeating background jobs * This interface controls a job scheduler that allows scheduling either repeating background jobs
* that execute periodically or delayed one-time actions that are scheduled in the future. * that execute periodically or delayed one-time actions that are scheduled in the future.
*/ */
public interface Scheduler { public interface Scheduler {
/** /**
* Initialize this scheduler so it is ready to accept scheduling of tasks * Initialize this scheduler, so it is ready to accept scheduling of tasks
*/ */
void startup(); void startup();

8
server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java

@ -19,11 +19,11 @@ package org.apache.kafka.server.util;
/** /**
* This class helps producers throttle throughput. * This class helps producers throttle throughput.
* * <br>
* If targetThroughput >= 0, the resulting average throughput will be approximately * If targetThroughput >= 0, the resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0, * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
* no throttling will occur. * no throttling will occur.
* * <br>
* To use, do this between successive send attempts: * To use, do this between successive send attempts:
* <pre> * <pre>
* {@code * {@code
@ -64,7 +64,7 @@ public class ThroughputThrottler {
* @param amountSoFar bytes produced so far if you want to throttle data throughput, or * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
* messages produced so far if you want to throttle message throughput. * messages produced so far if you want to throttle message throughput.
* @param sendStartMs timestamp of the most recently sent message * @param sendStartMs timestamp of the most recently sent message
* @return * @return <code>true</code> if throttling should happen
*/ */
public boolean shouldThrottle(long amountSoFar, long sendStartMs) { public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
if (this.targetThroughput < 0) { if (this.targetThroughput < 0) {
@ -78,7 +78,7 @@ public class ThroughputThrottler {
/** /**
* Occasionally blocks for small amounts of time to achieve targetThroughput. * Occasionally blocks for small amounts of time to achieve targetThroughput.
* * <br>
* Note that if targetThroughput is 0, this will block extremely aggressively. * Note that if targetThroughput is 0, this will block extremely aggressively.
*/ */
public void throttle() { public void throttle() {

2
server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java

@ -29,7 +29,7 @@ import java.util.function.Function;
/** /**
* A map which presents a lightweight view of another "underlying" map. Values in the * A map which presents a lightweight view of another "underlying" map. Values in the
* underlying map will be translated by a callback before they are returned. * underlying map will be translated by a callback before they are returned.
* * <br>
* This class is not internally synchronized. (Typically the underlyingMap is treated as * This class is not internally synchronized. (Typically the underlyingMap is treated as
* immutable.) * immutable.)
*/ */

2
server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java

@ -43,7 +43,7 @@ public class JsonArray implements JsonValue {
Stream<JsonNode> nodeStream = StreamSupport.stream( Stream<JsonNode> nodeStream = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(node.elements(), Spliterator.ORDERED), Spliterators.spliteratorUnknownSize(node.elements(), Spliterator.ORDERED),
false); false);
Stream<JsonValue> results = nodeStream.map(node -> JsonValue.apply(node)); Stream<JsonValue> results = nodeStream.map(JsonValue::apply);
return results.collect(Collectors.toList()).iterator(); return results.collect(Collectors.toList()).iterator();
} }

20
server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java

@ -26,18 +26,16 @@ import java.util.Optional;
/** /**
* A simple wrapper over Jackson's JsonNode that enables type safe parsing via the `DecodeJson` type * A simple wrapper over Jackson's JsonNode that enables type safe parsing via the `DecodeJson` type
* class. * class.
* * <br>
* Typical usage would be something like: * Typical usage would be something like:
* * <pre><code>
* {{{ * // Given a jsonNode containing a parsed JSON:
* val jsonNode: JsonNode = ??? * JsonObject jsonObject = JsonValue.apply(jsonNode).asJsonObject();
* val jsonObject = JsonValue(jsonNode).asJsonObject * Integer intField = jsonObject.apply("int_field").to(new DecodeJson.DecodeInteger());
* val intValue = jsonObject("int_field").to[Int] * Optional<Integer> optionLongField = jsonObject.apply("option_long_field").to(DecodeJson.decodeOptional(new DecodeJson.DecodeInteger()));
* val optionLongValue = jsonObject("option_long_field").to[Option[Long]] * Map<String, Integer> mapStringIntField = jsonObject.apply("map_string_int_field").to(DecodeJson.decodeMap(new DecodeJson.DecodeInteger()));
* val mapStringIntField = jsonObject("map_string_int_field").to[Map[String, Int]] * List<String> seqStringField = jsonObject.apply("seq_string_field").to(DecodeJson.decodeList(new DecodeJson.DecodeString()));
* val seqStringField = jsonObject("seq_string_field").to[Seq[String] * </code></pre>
* }}}
*
* The `to` method throws an exception if the value cannot be converted to the requested type. * The `to` method throws an exception if the value cannot be converted to the requested type.
*/ */

2
server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java

@ -27,7 +27,7 @@ public interface Timer extends AutoCloseable {
/** /**
* Advance the internal clock, executing any tasks whose expiration has been * Advance the internal clock, executing any tasks whose expiration has been
* reached within the duration of the passed timeout. * reached within the duration of the passed timeout.
* @param timeoutMs * @param timeoutMs the time to advance in milliseconds
* @return whether or not any tasks were executed * @return whether or not any tasks were executed
*/ */
boolean advanceClock(long timeoutMs) throws InterruptedException; boolean advanceClock(long timeoutMs) throws InterruptedException;

28
server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java

@ -21,11 +21,11 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Hierarchical Timing Wheels * Hierarchical Timing Wheels
* * <br>
* A simple timing wheel is a circular list of buckets of timer tasks. Let u be the time unit. * A simple timing wheel is a circular list of buckets of timer tasks. Let u be the time unit.
* A timing wheel with size n has n buckets and can hold timer tasks in n * u time interval. * A timing wheel with size n has n buckets and can hold timer tasks in n * u time interval.
* Each bucket holds timer tasks that fall into the corresponding time range. At the beginning, * Each bucket holds timer tasks that fall into the corresponding time range. At the beginning,
* the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), , * the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), &hellip;,
* the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the timer ticks and * the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the timer ticks and
* moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task * moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task
* into the bucket for the current time since it is already expired. The timer immediately runs * into the bucket for the current time since it is already expired. The timer immediately runs
@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue * A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue
* based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, have O(log n) * based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, have O(log n)
* insert/delete cost. * insert/delete cost.
* * <br>
* A major drawback of a simple timing wheel is that it assumes that a timer request is within * A major drawback of a simple timing wheel is that it assumes that a timer request is within
* the time interval of n * u from the current time. If a timer request is out of this interval, * the time interval of n * u from the current time. If a timer request is out of this interval,
* it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically * it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically
@ -47,50 +47,50 @@ import java.util.concurrent.atomic.AtomicInteger;
* are then moved to the finer grain wheels or be executed. The insert (start-timer) cost is O(m) * are then moved to the finer grain wheels or be executed. The insert (start-timer) cost is O(m)
* where m is the number of wheels, which is usually very small compared to the number of requests * where m is the number of wheels, which is usually very small compared to the number of requests
* in the system, and the delete (stop-timer) cost is still O(1). * in the system, and the delete (stop-timer) cost is still O(1).
* * <br>
* Example * Example
* Let's say that u is 1 and n is 3. If the start time is c, * Let's say that u is 1 and n is 3. If the start time is c,
* then the buckets at different levels are: * then the buckets at different levels are:
* * <pre>
* level buckets * level buckets
* 1 [c,c] [c+1,c+1] [c+2,c+2] * 1 [c,c] [c+1,c+1] [c+2,c+2]
* 2 [c,c+2] [c+3,c+5] [c+6,c+8] * 2 [c,c+2] [c+3,c+5] [c+6,c+8]
* 3 [c,c+8] [c+9,c+17] [c+18,c+26] * 3 [c,c+8] [c+9,c+17] [c+18,c+26]
* * </pre>
* The bucket expiration is at the time of bucket beginning. * The bucket expiration is at the time of bucket beginning.
* So at time = c+1, buckets [c,c], [c,c+2] and [c,c+8] are expired. * So at time = c+1, buckets [c,c], [c,c+2] and [c,c+8] are expired.
* Level 1's clock moves to c+1, and [c+3,c+3] is created. * Level 1's clock moves to c+1, and [c+3,c+3] is created.
* Level 2 and level3's clock stay at c since their clocks move in unit of 3 and 9, respectively. * Level 2 and level3's clock stay at c since their clocks move in unit of 3 and 9, respectively.
* So, no new buckets are created in level 2 and 3. * So, no new buckets are created in level 2 and 3.
* * <br>
* Note that bucket [c,c+2] in level 2 won't receive any task since that range is already covered in level 1. * Note that bucket [c,c+2] in level 2 won't receive any task since that range is already covered in level 1.
* The same is true for the bucket [c,c+8] in level 3 since its range is covered in level 2. * The same is true for the bucket [c,c+8] in level 3 since its range is covered in level 2.
* This is a bit wasteful, but simplifies the implementation. * This is a bit wasteful, but simplifies the implementation.
* * <pre>
* 1 [c+1,c+1] [c+2,c+2] [c+3,c+3] * 1 [c+1,c+1] [c+2,c+2] [c+3,c+3]
* 2 [c,c+2] [c+3,c+5] [c+6,c+8] * 2 [c,c+2] [c+3,c+5] [c+6,c+8]
* 3 [c,c+8] [c+9,c+17] [c+18,c+26] * 3 [c,c+8] [c+9,c+17] [c+18,c+26]
* * </pre>
* At time = c+2, [c+1,c+1] is newly expired. * At time = c+2, [c+1,c+1] is newly expired.
* Level 1 moves to c+2, and [c+4,c+4] is created, * Level 1 moves to c+2, and [c+4,c+4] is created,
* * <pre>
* 1 [c+2,c+2] [c+3,c+3] [c+4,c+4] * 1 [c+2,c+2] [c+3,c+3] [c+4,c+4]
* 2 [c,c+2] [c+3,c+5] [c+6,c+8] * 2 [c,c+2] [c+3,c+5] [c+6,c+8]
* 3 [c,c+8] [c+9,c+17] [c+18,c+26] * 3 [c,c+8] [c+9,c+17] [c+18,c+26]
* * </pre>
* At time = c+3, [c+2,c+2] is newly expired. * At time = c+3, [c+2,c+2] is newly expired.
* Level 2 moves to c+3, and [c+5,c+5] and [c+9,c+11] are created. * Level 2 moves to c+3, and [c+5,c+5] and [c+9,c+11] are created.
* Level 3 stay at c. * Level 3 stay at c.
* * <pre>
* 1 [c+3,c+3] [c+4,c+4] [c+5,c+5] * 1 [c+3,c+3] [c+4,c+4] [c+5,c+5]
* 2 [c+3,c+5] [c+6,c+8] [c+9,c+11] * 2 [c+3,c+5] [c+6,c+8] [c+9,c+11]
* 3 [c,c+8] [c+9,c+17] [c+18,c+26] * 3 [c,c+8] [c+9,c+17] [c+18,c+26]
* * </pre>
* The hierarchical timing wheels works especially well when operations are completed before they time out. * The hierarchical timing wheels works especially well when operations are completed before they time out.
* Even when everything times out, it still has advantageous when there are many items in the timer. * Even when everything times out, it still has advantageous when there are many items in the timer.
* Its insert cost (including reinsert) and delete cost are O(m) and O(1), respectively while priority * Its insert cost (including reinsert) and delete cost are O(m) and O(1), respectively while priority
* queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue. * queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue.
* * <br>
* This class is not thread-safe. There should not be any add calls while advanceClock is executing. * This class is not thread-safe. There should not be any add calls while advanceClock is executing.
* It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe. * It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe.
*/ */

26
server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java

@ -22,14 +22,14 @@ import java.util.List;
/** /**
* A hash table which uses separate chaining. * A hash table which uses separate chaining.
* * <br>
* In order to optimize memory consumption a bit, the common case where there is * In order to optimize memory consumption a bit, the common case where there is
* one element per slot is handled by simply placing the element in the slot, * one element per slot is handled by simply placing the element in the slot,
* and the case where there are multiple elements is handled by creating an * and the case where there are multiple elements is handled by creating an
* array and putting that in the slot. Java is storing type info in memory * array and putting that in the slot. Java is storing type info in memory
* about every object whether we want it or not, so let's get some benefit * about every object whether we want it or not, so let's get some benefit
* out of it. * out of it.
* * <br>
* Arrays and null values cannot be inserted. * Arrays and null values cannot be inserted.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -58,7 +58,7 @@ class BaseHashTable<T> {
/** /**
* Calculate the capacity we should provision, given the expected size. * Calculate the capacity we should provision, given the expected size.
* * <br>
* Our capacity must always be a power of 2, and never less than 2 or more * Our capacity must always be a power of 2, and never less than 2 or more
* than MAX_CAPACITY. We use 64-bit numbers here to avoid overflow * than MAX_CAPACITY. We use 64-bit numbers here to avoid overflow
* concerns. * concerns.
@ -180,7 +180,7 @@ class BaseHashTable<T> {
/** /**
* Expand the hash table to a new size. Existing elements will be copied to new slots. * Expand the hash table to a new size. Existing elements will be copied to new slots.
*/ */
final private void rehash(int newSize) { private void rehash(int newSize) {
Object[] prevElements = elements; Object[] prevElements = elements;
elements = new Object[newSize]; elements = new Object[newSize];
List<Object> ready = new ArrayList<>(); List<Object> ready = new ArrayList<>();
@ -224,15 +224,15 @@ class BaseHashTable<T> {
*/ */
static <T> void unpackSlot(List<T> out, Object[] elements, int slot) { static <T> void unpackSlot(List<T> out, Object[] elements, int slot) {
Object value = elements[slot]; Object value = elements[slot];
if (value == null) { if (value != null) {
return; if (value instanceof Object[]) {
} else if (value instanceof Object[]) { Object[] array = (Object[]) value;
Object[] array = (Object[]) value; for (Object object : array) {
for (Object object : array) { out.add((T) object);
out.add((T) object); }
} else {
out.add((T) value);
} }
} else {
out.add((T) value);
} }
} }

2
server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java

@ -22,7 +22,7 @@ import java.util.Map;
/** /**
* A snapshot of some timeline data structures. * A snapshot of some timeline data structures.
* * <br>
* The snapshot contains historical data for several timeline data structures. * The snapshot contains historical data for several timeline data structures.
* We use an IdentityHashMap to store this data. This way, we can easily drop all of * We use an IdentityHashMap to store this data. This way, we can easily drop all of
* the snapshot data. * the snapshot data.

2
server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java

@ -182,7 +182,7 @@ public class SnapshotRegistry {
/** /**
* Creates a new snapshot at the given epoch. * Creates a new snapshot at the given epoch.
* * <br>
* If {@code epoch} already exists and it is the last snapshot then just return that snapshot. * If {@code epoch} already exists and it is the last snapshot then just return that snapshot.
* *
* @param epoch The epoch to create the snapshot at. The current epoch * @param epoch The epoch to create the snapshot at. The current epoch

24
server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java

@ -29,48 +29,48 @@ import java.util.NoSuchElementException;
* We handle divergences between the current state and historical state by copying a * We handle divergences between the current state and historical state by copying a
* reference to elements that have been deleted or overwritten into the most recent * reference to elements that have been deleted or overwritten into the most recent
* snapshot tier. * snapshot tier.
* * <br>
* Note that there are no keys in SnapshottableHashTable, only values. So it more similar * Note that there are no keys in SnapshottableHashTable, only values. So it more similar
* to a hash set than a hash map. The subclasses implement full-featured maps and sets * to a hash set than a hash map. The subclasses implement full-featured maps and sets
* using this class as a building block. * using this class as a building block.
* * <br>
* Each snapshot tier contains a size and a hash table. The size reflects the size at * Each snapshot tier contains a size and a hash table. The size reflects the size at
* the time the snapshot was taken. Note that, as an optimization, snapshot tiers will * the time the snapshot was taken. Note that, as an optimization, snapshot tiers will
* be null if they don't contain anything. So for example, if snapshot 20 of Object O * be null if they don't contain anything. So for example, if snapshot 20 of Object O
* contains the same entries as snapshot 10 of that object, the snapshot 20 tier for * contains the same entries as snapshot 10 of that object, the snapshot 20 tier for
* object O will be null. * object O will be null.
* * <br>
* The current tier's data is stored in the fields inherited from BaseHashTable. It * The current tier's data is stored in the fields inherited from BaseHashTable. It
* would be conceptually simpler to have a separate BaseHashTable object, but since Java * would be conceptually simpler to have a separate BaseHashTable object, but since Java
* doesn't have value types, subclassing is the only way to avoid another pointer * doesn't have value types, subclassing is the only way to avoid another pointer
* indirection and the associated extra memory cost. * indirection and the associated extra memory cost.
* * <br>
* Note that each element in the hash table contains a start epoch, and a value. The * Note that each element in the hash table contains a start epoch, and a value. The
* start epoch is there to identify when the object was first inserted. This in turn * start epoch is there to identify when the object was first inserted. This in turn
* determines which snapshots it is a member of. * determines which snapshots it is a member of.
* * <br>
* In order to retrieve an object from snapshot E, we start by checking to see if the * In order to retrieve an object from snapshot E, we start by checking to see if the
* object exists in the "current" hash tier. If it does, and its startEpoch extends back * object exists in the "current" hash tier. If it does, and its startEpoch extends back
* to E, we return that object. Otherwise, we check all the snapshot tiers, starting * to E, we return that object. Otherwise, we check all the snapshot tiers, starting
* with E, and ending with the most recent snapshot, to see if the object is there. * with E, and ending with the most recent snapshot, to see if the object is there.
* As an optimization, if we encounter the object in a snapshot tier but its epoch is too * As an optimization, if we encounter the object in a snapshot tier but its epoch is too
* new, we know that its value at epoch E must be null, so we can return that immediately. * new, we know that its value at epoch E must be null, so we can return that immediately.
* * <br>
* The class hierarchy looks like this: * The class hierarchy looks like this:
* * <pre>
* Revertable BaseHashTable * Revertable BaseHashTable
* *
* SnapshottableHashTable SnapshotRegistry Snapshot * SnapshottableHashTable SnapshotRegistry Snapshot
* *
* TimelineHashSet TimelineHashMap * TimelineHashSet TimelineHashMap
* * </pre>
* BaseHashTable is a simple hash table that uses separate chaining. The interface is * BaseHashTable is a simple hash table that uses separate chaining. The interface is
* pretty bare-bones since this class is not intended to be used directly by end-users. * pretty bare-bones since this class is not intended to be used directly by end-users.
* * <br>
* This class, SnapshottableHashTable, has the logic for snapshotting and iterating over * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over
* snapshots. This is the core of the snapshotted hash table code and handles the * snapshots. This is the core of the snapshotted hash table code and handles the
* tiering. * tiering.
* * <br>
* TimelineHashSet and TimelineHashMap are mostly wrappers around this * TimelineHashSet and TimelineHashMap are mostly wrappers around this
* SnapshottableHashTable class. They implement standard Java APIs for Set and Map, * SnapshottableHashTable class. They implement standard Java APIs for Set and Map,
* respectively. There's a fair amount of boilerplate for this, but it's necessary so * respectively. There's a fair amount of boilerplate for this, but it's necessary so
@ -78,11 +78,11 @@ import java.util.NoSuchElementException;
* The accessor APIs have two versions -- one that looks at the current state, and one * The accessor APIs have two versions -- one that looks at the current state, and one
* that looks at a historical snapshotted state. Mutation APIs only ever mutate the * that looks at a historical snapshotted state. Mutation APIs only ever mutate the
* current state. * current state.
* * <br>
* One very important feature of SnapshottableHashTable is that we support iterating * One very important feature of SnapshottableHashTable is that we support iterating
* over a snapshot even while changes are being made to the current state. See the * over a snapshot even while changes are being made to the current state. See the
* Javadoc for the iterator for more information about how this is accomplished. * Javadoc for the iterator for more information about how this is accomplished.
* * <br>
* All of these classes require external synchronization, and don't support null keys or * All of these classes require external synchronization, and don't support null keys or
* values. * values.
*/ */

4
server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java

@ -27,9 +27,9 @@ import java.util.Set;
/** /**
* This is a hash map which can be snapshotted. * This is a hash map which can be snapshotted.
* * <br>
* See {@SnapshottableHashTable} for more details about the implementation. * See {@SnapshottableHashTable} for more details about the implementation.
* * <br>
* This class requires external synchronization. Null keys and values are not supported. * This class requires external synchronization. Null keys and values are not supported.
* *
* @param <K> The key type of the set. * @param <K> The key type of the set.

4
server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java

@ -24,9 +24,9 @@ import java.util.Set;
/** /**
* This is a hash set which can be snapshotted. * This is a hash set which can be snapshotted.
* * <br>
* See {@SnapshottableHashTable} for more details about the implementation. * See {@SnapshottableHashTable} for more details about the implementation.
* * <br>
* This class requires external synchronization. Null values are not supported. * This class requires external synchronization. Null values are not supported.
* *
* @param <T> The value type of the set. * @param <T> The value type of the set.

3
server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java

@ -22,7 +22,7 @@ import java.util.Iterator;
/** /**
* This is a mutable integer which can be snapshotted. * This is a mutable integer which can be snapshotted.
* * <br>
* This class requires external synchronization. * This class requires external synchronization.
*/ */
public class TimelineInteger implements Revertable { public class TimelineInteger implements Revertable {
@ -93,7 +93,6 @@ public class TimelineInteger implements Revertable {
set(get() - 1); set(get() - 1);
} }
@SuppressWarnings("unchecked")
@Override @Override
public void executeRevert(long targetEpoch, Delta delta) { public void executeRevert(long targetEpoch, Delta delta) {
IntegerContainer container = (IntegerContainer) delta; IntegerContainer container = (IntegerContainer) delta;

3
server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java

@ -22,7 +22,7 @@ import java.util.Iterator;
/** /**
* This is a mutable long which can be snapshotted. * This is a mutable long which can be snapshotted.
* * <br>
* This class requires external synchronization. * This class requires external synchronization.
*/ */
public class TimelineLong implements Revertable { public class TimelineLong implements Revertable {
@ -93,7 +93,6 @@ public class TimelineLong implements Revertable {
set(get() - 1L); set(get() - 1L);
} }
@SuppressWarnings("unchecked")
@Override @Override
public void executeRevert(long targetEpoch, Delta delta) { public void executeRevert(long targetEpoch, Delta delta) {
LongContainer container = (LongContainer) delta; LongContainer container = (LongContainer) delta;

2
server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java

@ -23,7 +23,7 @@ import java.util.Objects;
/** /**
* This is a mutable reference to an immutable object. It can be snapshotted. * This is a mutable reference to an immutable object. It can be snapshotted.
* * <br>
* This class requires external synchronization. * This class requires external synchronization.
*/ */
public class TimelineObject<T> implements Revertable { public class TimelineObject<T> implements Revertable {

Loading…
Cancel
Save