Browse Source

KAFKA-15485: Fix "this-escape" compiler warnings introduced by JDK 21 (1/N) (#14427)

This is one of the steps required for kafka to compile with Java 21.

For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.

In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.

See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
pull/14433/head
Ismael Juma 1 year ago committed by GitHub
parent
commit
98febb989a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
  2. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
  3. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
  4. 3
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
  5. 2
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  6. 2
      clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  7. 5
      clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
  8. 1
      clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
  9. 1
      clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
  10. 2
      clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
  11. 2
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
  12. 2
      clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
  13. 2
      clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
  14. 1
      clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
  15. 1
      clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java
  16. 1
      clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
  17. 16
      clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
  18. 2
      clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java
  19. 2
      clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java
  20. 2
      clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java
  21. 1
      clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
  22. 2
      clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
  23. 2
      clients/src/main/java/org/apache/kafka/common/utils/PureJavaCrc32C.java
  24. 4
      clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  25. 1
      clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
  26. 1
      clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
  27. 1
      clients/src/test/java/org/apache/kafka/common/network/SslSender.java
  28. 1
      clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
  29. 1
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
  30. 1
      clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
  31. 1
      connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
  32. 1
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
  33. 1
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
  34. 1
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
  35. 1
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  36. 1
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
  37. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
  38. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
  39. 1
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
  40. 1
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
  41. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
  42. 1
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
  43. 7
      core/src/main/java/kafka/log/remote/RemoteLogManager.java
  44. 2
      generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
  45. 2
      metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
  46. 1
      raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java
  47. 2
      server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
  48. 1
      server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
  49. 2
      server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java
  50. 1
      server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java
  51. 1
      server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
  52. 1
      server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
  53. 1
      server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
  54. 1
      server-common/src/test/java/org/apache/kafka/server/util/MockTime.java
  55. 3
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
  56. 6
      storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
  57. 1
      storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
  58. 1
      storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
  59. 2
      storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
  60. 1
      storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
  61. 1
      storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
  62. 1
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  63. 1
      streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
  64. 1
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  65. 1
      streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
  66. 1
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
  67. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
  68. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
  69. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  70. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  71. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
  72. 2
      streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
  73. 1
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
  74. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
  75. 3
      streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java
  76. 1
      streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
  77. 1
      streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
  78. 1
      trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java
  79. 1
      trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
  80. 1
      trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
  81. 1
      trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
  82. 1
      trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
  83. 1
      trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

@ -1291,7 +1291,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1291,7 +1291,7 @@ public abstract class AbstractCoordinator implements Closeable {
}
}
protected Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) {
protected final Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) {
return new Meter(new WindowedCount(),
metrics.metricName(baseName + "-rate", groupName,
String.format("The number of %s per second", descriptiveName)),

4
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java

@ -266,11 +266,11 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -266,11 +266,11 @@ public class DefaultBackgroundThread extends KafkaThread {
return this.running;
}
public void wakeup() {
public final void wakeup() {
networkClientDelegate.wakeup();
}
public void close() {
public final void close() {
this.running = false;
this.wakeup();
Utils.closeQuietly(networkClientDelegate, "network client utils");

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java

@ -77,7 +77,7 @@ public class MembershipManagerImpl implements MembershipManager { @@ -77,7 +77,7 @@ public class MembershipManagerImpl implements MembershipManager {
* @param assignorSelection New assignor selection
* @throws IllegalArgumentException If the provided assignor selection is null
*/
public void setAssignorSelection(AssignorSelection assignorSelection) {
public final void setAssignorSelection(AssignorSelection assignorSelection) {
if (assignorSelection == null) {
throw new IllegalArgumentException("Assignor selection cannot be null");
}

3
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java

@ -84,6 +84,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -84,6 +84,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
private final ApiVersions apiVersions;
private final NetworkClientDelegate networkClientDelegate;
@SuppressWarnings("this-escape")
public OffsetsRequestManager(final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
final IsolationLevel isolationLevel,
@ -620,4 +621,4 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis @@ -620,4 +621,4 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
int requestsToSend() {
return requestsToSend.size();
}
}
}

2
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -333,7 +333,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -333,7 +333,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
// visible for testing
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "this-escape"})
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,

2
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java

@ -101,7 +101,7 @@ public class AbstractConfig { @@ -101,7 +101,7 @@ public class AbstractConfig {
* the constructor to resolve any variables in {@code originals}; may be null or empty
* @param doLog whether the configurations should be logged
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "this-escape"})
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
/* check that all the keys are really strings */
for (Map.Entry<?, ?> entry : originals.entrySet())

5
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java

@ -1172,10 +1172,11 @@ public class ConfigDef { @@ -1172,10 +1172,11 @@ public class ConfigDef {
boolean internalConfig) {
this.name = name;
this.type = type;
this.defaultValue = NO_DEFAULT_VALUE.equals(defaultValue) ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
boolean hasDefault = !NO_DEFAULT_VALUE.equals(defaultValue);
this.defaultValue = hasDefault ? parseType(name, defaultValue, type) : NO_DEFAULT_VALUE;
this.validator = validator;
this.importance = importance;
if (this.validator != null && hasDefault())
if (this.validator != null && hasDefault)
this.validator.ensureValid(name, this.defaultValue);
this.documentation = documentation;
this.dependents = dependents;

1
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java

@ -154,6 +154,7 @@ public class Metrics implements Closeable { @@ -154,6 +154,7 @@ public class Metrics implements Closeable {
* @param enableExpiration true if the metrics instance can garbage collect inactive sensors, false otherwise
* @param metricsContext The metricsContext to initialize metrics reporter with
*/
@SuppressWarnings("this-escape")
public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration,
MetricsContext metricsContext) {
this.config = defaultConfig;

1
clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java

@ -53,6 +53,7 @@ public class Schema extends Type { @@ -53,6 +53,7 @@ public class Schema extends Type {
*
* @throws SchemaException If the given list have duplicate fields
*/
@SuppressWarnings("this-escape")
public Schema(boolean tolerateMissingFieldsWithDefaults, Field... fs) {
this.fields = new BoundField[fs.length];
this.fieldsByName = new HashMap<>();

2
clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java

@ -116,7 +116,7 @@ public class LazyDownConversionRecords implements BaseRecords { @@ -116,7 +116,7 @@ public class LazyDownConversionRecords implements BaseRecords {
")";
}
public java.util.Iterator<ConvertedRecords<?>> iterator(long maximumReadSize) {
public final java.util.Iterator<ConvertedRecords<?>> iterator(long maximumReadSize) {
// We typically expect only one iterator instance to be created, so null out the first converted batch after
// first use to make it available for GC.
ConvertedRecords firstBatch = firstConvertedBatch;

2
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

@ -224,7 +224,7 @@ public class MemoryRecordsBuilder implements AutoCloseable { @@ -224,7 +224,7 @@ public class MemoryRecordsBuilder implements AutoCloseable {
return isTransactional;
}
public boolean hasDeleteHorizonMs() {
public final boolean hasDeleteHorizonMs() {
return magic >= RecordBatch.MAGIC_VALUE_V2 && deleteHorizonMs >= 0L;
}

2
clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java

@ -65,7 +65,7 @@ public class DeleteAclsResponse extends AbstractResponse { @@ -65,7 +65,7 @@ public class DeleteAclsResponse extends AbstractResponse {
data.setThrottleTimeMs(throttleTimeMs);
}
public List<DeleteAclsResponseData.DeleteAclsFilterResult> filterResults() {
public final List<DeleteAclsResponseData.DeleteAclsFilterResult> filterResults() {
return data.filterResults();
}

2
clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java

@ -85,7 +85,7 @@ public class DescribeAclsResponse extends AbstractResponse { @@ -85,7 +85,7 @@ public class DescribeAclsResponse extends AbstractResponse {
return errorCounts(Errors.forCode(data.errorCode()));
}
public List<DescribeAclsResource> acls() {
public final List<DescribeAclsResource> acls() {
return data.resources();
}

1
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java

@ -167,6 +167,7 @@ public class SaslClientAuthenticator implements Authenticator { @@ -167,6 +167,7 @@ public class SaslClientAuthenticator implements Authenticator {
// Version of SaslHandshake request/responses
private short saslHandshakeVersion;
@SuppressWarnings("this-escape")
public SaslClientAuthenticator(Map<String, ?> configs,
AuthenticateCallbackHandler callbackHandler,
String node,

1
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientProvider.java

@ -25,6 +25,7 @@ import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslCli @@ -25,6 +25,7 @@ import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslCli
public class OAuthBearerSaslClientProvider extends Provider {
private static final long serialVersionUID = 1L;
@SuppressWarnings("this-escape")
protected OAuthBearerSaslClientProvider() {
super("SASL/OAUTHBEARER Client Provider", 1.0, "SASL/OAUTHBEARER Client Provider for Kafka");
put("SaslClientFactory." + OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,

1
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java

@ -25,6 +25,7 @@ import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslSer @@ -25,6 +25,7 @@ import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslSer
public class OAuthBearerSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
@SuppressWarnings("this-escape")
protected OAuthBearerSaslServerProvider() {
super("SASL/OAUTHBEARER Server Provider", 1.0, "SASL/OAUTHBEARER Server Provider for Kafka");
put("SaslServerFactory." + OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,

16
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java

@ -121,7 +121,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken { @@ -121,7 +121,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
*
* @return the 3 or 5 dot-separated sections of the JWT compact serialization
*/
public List<String> splits() {
public final List<String> splits() {
return splits;
}
@ -130,7 +130,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken { @@ -130,7 +130,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
*
* @return the JOSE header
*/
public Map<String, Object> header() {
public final Map<String, Object> header() {
return header;
}
@ -159,7 +159,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken { @@ -159,7 +159,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
*
* @return the (always non-null but possibly empty) claims
*/
public Map<String, Object> claims() {
public final Map<String, Object> claims() {
return claims;
}
@ -191,7 +191,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken { @@ -191,7 +191,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
* Number.class, or List.class
* @return true if the claim exists and is the given type, otherwise false
*/
public boolean isClaimType(String claimName, Class<?> type) {
public final boolean isClaimType(String claimName, Class<?> type) {
Object value = rawClaim(claimName);
Objects.requireNonNull(type);
if (value == null)
@ -215,7 +215,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken { @@ -215,7 +215,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
* @throws OAuthBearerIllegalTokenException
* if the claim exists but is not the given type
*/
public <T> T claim(String claimName, Class<T> type) throws OAuthBearerIllegalTokenException {
public final <T> T claim(String claimName, Class<T> type) throws OAuthBearerIllegalTokenException {
Object value = rawClaim(claimName);
try {
return Objects.requireNonNull(type).cast(value);
@ -233,7 +233,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken { @@ -233,7 +233,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
* the mandatory JWT claim name
* @return the raw claim value, if it exists, otherwise null
*/
public Object rawClaim(String claimName) {
public final Object rawClaim(String claimName) {
return claims().get(Objects.requireNonNull(claimName));
}
@ -248,7 +248,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken { @@ -248,7 +248,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
* @throws OAuthBearerIllegalTokenException
* if the claim value is the incorrect type
*/
public Number expirationTime() throws OAuthBearerIllegalTokenException {
public final Number expirationTime() throws OAuthBearerIllegalTokenException {
return claim("exp", Number.class);
}
@ -343,7 +343,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken { @@ -343,7 +343,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
}
private Set<String> calculateScope() {
String scopeClaimName = scopeClaimName();
String scopeClaimName = this.scopeClaimName;
if (isClaimType(scopeClaimName, String.class)) {
String scopeClaimValue = claim(scopeClaimName, String.class);
if (Utils.isBlank(scopeClaimValue))

2
clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerProvider.java

@ -25,7 +25,7 @@ public class PlainSaslServerProvider extends Provider { @@ -25,7 +25,7 @@ public class PlainSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
@SuppressWarnings("deprecation")
@SuppressWarnings({"deprecation", "this-escape"})
protected PlainSaslServerProvider() {
super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka");
put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName());

2
clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.java

@ -25,7 +25,7 @@ public class ScramSaslClientProvider extends Provider { @@ -25,7 +25,7 @@ public class ScramSaslClientProvider extends Provider {
private static final long serialVersionUID = 1L;
@SuppressWarnings("deprecation")
@SuppressWarnings({"deprecation", "this-escape"})
protected ScramSaslClientProvider() {
super("SASL/SCRAM Client Provider", 1.0, "SASL/SCRAM Client Provider for Kafka");
for (ScramMechanism mechanism : ScramMechanism.values())

2
clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.java

@ -25,7 +25,7 @@ public class ScramSaslServerProvider extends Provider { @@ -25,7 +25,7 @@ public class ScramSaslServerProvider extends Provider {
private static final long serialVersionUID = 1L;
@SuppressWarnings("deprecation")
@SuppressWarnings({"deprecation", "this-escape"})
protected ScramSaslServerProvider() {
super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider for Kafka");
for (ScramMechanism mechanism : ScramMechanism.values())

1
clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java

@ -553,6 +553,7 @@ public class ImplicitLinkedHashCollection<E extends ImplicitLinkedHashCollection @@ -553,6 +553,7 @@ public class ImplicitLinkedHashCollection<E extends ImplicitLinkedHashCollection
* @param iter We will add all the elements accessible through this iterator
* to the set.
*/
@SuppressWarnings("this-escape")
public ImplicitLinkedHashCollection(Iterator<E> iter) {
clear(0);
while (iter.hasNext()) {

2
clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java

@ -34,11 +34,13 @@ public class KafkaThread extends Thread { @@ -34,11 +34,13 @@ public class KafkaThread extends Thread {
return new KafkaThread(name, runnable, false);
}
@SuppressWarnings("this-escape")
public KafkaThread(final String name, boolean daemon) {
super(name);
configureThread(name, daemon);
}
@SuppressWarnings("this-escape")
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);

2
clients/src/main/java/org/apache/kafka/common/utils/PureJavaCrc32C.java

@ -49,7 +49,7 @@ public class PureJavaCrc32C implements Checksum { @@ -49,7 +49,7 @@ public class PureJavaCrc32C implements Checksum {
}
@Override
public void reset() {
public final void reset() {
crc = 0xffffffff;
}

4
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

@ -118,6 +118,7 @@ public class MockAdminClient extends AdminClient { @@ -118,6 +118,7 @@ public class MockAdminClient extends AdminClient {
private Map<String, Short> minSupportedFeatureLevels = Collections.emptyMap();
private Map<String, Short> maxSupportedFeatureLevels = Collections.emptyMap();
@SuppressWarnings("this-escape")
public Builder() {
numBrokers(1);
}
@ -217,6 +218,7 @@ public class MockAdminClient extends AdminClient { @@ -217,6 +218,7 @@ public class MockAdminClient extends AdminClient {
Collections.emptyMap());
}
@SuppressWarnings("this-escape")
private MockAdminClient(
List<Node> brokers,
Node controller,
@ -250,7 +252,7 @@ public class MockAdminClient extends AdminClient { @@ -250,7 +252,7 @@ public class MockAdminClient extends AdminClient {
this.maxSupportedFeatureLevels = new HashMap<>(maxSupportedFeatureLevels);
}
synchronized public void controller(Node controller) {
public synchronized void controller(Node controller) {
if (!brokers.contains(controller))
throw new IllegalArgumentException("The controller node must be in the list of brokers");
this.controller = controller;

1
clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java

@ -101,6 +101,7 @@ public class NioEchoServer extends Thread { @@ -101,6 +101,7 @@ public class NioEchoServer extends Thread {
new DelegationTokenCache(ScramMechanism.mechanismNames()));
}
@SuppressWarnings("this-escape")
public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache,
int failedAuthenticationDelayMs, Time time, DelegationTokenCache tokenCache) throws Exception {

1
clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java

@ -25,6 +25,7 @@ import java.net.Socket; @@ -25,6 +25,7 @@ import java.net.Socket;
*/
public class PlaintextSender extends Thread {
@SuppressWarnings("this-escape")
public PlaintextSender(final InetSocketAddress serverAddress, final byte[] payload) {
super(() -> {
try (Socket connection = new Socket(serverAddress.getAddress(), serverAddress.getPort());

1
clients/src/test/java/org/apache/kafka/common/network/SslSender.java

@ -33,6 +33,7 @@ public class SslSender extends Thread { @@ -33,6 +33,7 @@ public class SslSender extends Thread {
private final byte[] payload;
private final CountDownLatch handshaked = new CountDownLatch(1);
@SuppressWarnings("this-escape")
public SslSender(String tlsProtocol, InetSocketAddress serverAddress, byte[] payload) {
this.tlsProtocol = tlsProtocol;
this.serverAddress = serverAddress;

1
clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java

@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@SuppressWarnings("this-escape")
public class DefaultSslEngineFactoryTest {
/*

1
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java

@ -27,6 +27,7 @@ public class TestProvider extends Provider { @@ -27,6 +27,7 @@ public class TestProvider extends Provider {
this("TestProvider", 0.1, "provider for test cases");
}
@SuppressWarnings("this-escape")
protected TestProvider(String name, double version, String info) {
super(name, version, info);
super.put(KEY_MANAGER_FACTORY, TestKeyManagerFactory.class.getName());

1
clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java

@ -42,6 +42,7 @@ public class MockScheduler implements Scheduler, MockTime.Listener { @@ -42,6 +42,7 @@ public class MockScheduler implements Scheduler, MockTime.Listener {
*/
private final TreeMap<Long, List<KafkaFutureImpl<Long>>> waiters = new TreeMap<>();
@SuppressWarnings("this-escape")
public MockScheduler(MockTime time) {
this.time = time;
time.addListener(this);

1
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java

@ -87,6 +87,7 @@ public class JsonConverterConfig extends ConverterConfig { @@ -87,6 +87,7 @@ public class JsonConverterConfig extends ConverterConfig {
private final DecimalFormat decimalFormat;
private final boolean replaceNullWithDefault;
@SuppressWarnings("this-escape")
public JsonConverterConfig(Map<String, ?> props) {
super(CONFIG, props);
this.schemasEnabled = getBoolean(SCHEMAS_ENABLE_CONFIG);

1
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java

@ -116,6 +116,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig { @@ -116,6 +116,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
private final ReplicationPolicy replicationPolicy;
@SuppressWarnings("this-escape")
protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String> props) {
super(configDef, props, true);
replicationPolicy = getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class);

1
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java

@ -90,6 +90,7 @@ public class MirrorMakerConfig extends AbstractConfig { @@ -90,6 +90,7 @@ public class MirrorMakerConfig extends AbstractConfig {
private final Map<String, String> rawProperties;
@SuppressWarnings("this-escape")
public MirrorMakerConfig(Map<String, String> props) {
super(config(), props, true);
plugins = new Plugins(originalsStrings());

1
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java

@ -247,6 +247,7 @@ public class SourceConnectorConfig extends ConnectorConfig { @@ -247,6 +247,7 @@ public class SourceConnectorConfig extends ConnectorConfig {
return newDef;
}
@SuppressWarnings("this-escape")
public SourceConnectorConfig(Plugins plugins, Map<String, String> props, boolean createTopics) {
super(plugins, configDef(), props);
if (createTopics && props.entrySet().stream().anyMatch(e -> e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {

1
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

@ -171,6 +171,7 @@ public class Worker { @@ -171,6 +171,7 @@ public class Worker {
this(workerId, time, plugins, config, globalOffsetBackingStore, Executors.newCachedThreadPool(), connectorClientConfigOverridePolicy, Admin::create);
}
@SuppressWarnings("this-escape")
Worker(
String workerId,
Time time,

1
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java

@ -436,6 +436,7 @@ public class WorkerConfig extends AbstractConfig { @@ -436,6 +436,7 @@ public class WorkerConfig extends AbstractConfig {
}
}
@SuppressWarnings("this-escape")
public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
logInternalConverterRemovalWarnings(props);

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java

@ -381,15 +381,15 @@ public class WorkerConnector implements Runnable { @@ -381,15 +381,15 @@ public class WorkerConnector implements Runnable {
}
}
public boolean isSinkConnector() {
public final boolean isSinkConnector() {
return ConnectUtils.isSinkConnector(connector);
}
public boolean isSourceConnector() {
public final boolean isSourceConnector() {
return ConnectUtils.isSourceConnector(connector);
}
protected String connectorType() {
protected final String connectorType() {
if (isSinkConnector())
return "sink";
if (isSourceConnector())

4
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java

@ -78,7 +78,7 @@ public class WorkerInfo { @@ -78,7 +78,7 @@ public class WorkerInfo {
/**
* Collect general runtime information.
*/
protected void addRuntimeInfo() {
protected final void addRuntimeInfo() {
List<String> jvmArgs = RUNTIME.getInputArguments();
values.put("jvm.args", Utils.join(jvmArgs, ", "));
String[] jvmSpec = {
@ -94,7 +94,7 @@ public class WorkerInfo { @@ -94,7 +94,7 @@ public class WorkerInfo {
/**
* Collect system information.
*/
protected void addSystemInfo() {
protected final void addSystemInfo() {
String[] osInfo = {
OS.getName(),
OS.getArch(),

1
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java

@ -571,6 +571,7 @@ public class DistributedConfig extends WorkerConfig { @@ -571,6 +571,7 @@ public class DistributedConfig extends WorkerConfig {
}
// Visible for testing
@SuppressWarnings("this-escape")
DistributedConfig(Crypto crypto, Map<String, String> props) {
super(config(crypto), props);
this.crypto = crypto;

1
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java

@ -65,6 +65,7 @@ public class Plugins { @@ -65,6 +65,7 @@ public class Plugins {
}
// VisibleForTesting
@SuppressWarnings("this-escape")
Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) {
String pluginPath = WorkerConfig.pluginPath(props);
PluginDiscoveryMode discoveryMode = WorkerConfig.pluginDiscovery(props);

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java

@ -102,7 +102,7 @@ public abstract class RestServer { @@ -102,7 +102,7 @@ public abstract class RestServer {
/**
* Adds Jetty connector for each configured listener
*/
public void createConnectors(List<String> listeners, List<String> adminListeners) {
public final void createConnectors(List<String> listeners, List<String> adminListeners) {
List<Connector> connectors = new ArrayList<>();
for (String listener : listeners) {
@ -125,14 +125,14 @@ public abstract class RestServer { @@ -125,14 +125,14 @@ public abstract class RestServer {
/**
* Creates regular (non-admin) Jetty connector according to configuration
*/
public Connector createConnector(String listener) {
public final Connector createConnector(String listener) {
return createConnector(listener, false);
}
/**
* Creates Jetty connector according to configuration
*/
public Connector createConnector(String listener, boolean isAdmin) {
public final Connector createConnector(String listener, boolean isAdmin) {
Matcher listenerMatcher = LISTENER_PATTERN.matcher(listener);
if (!listenerMatcher.matches())

1
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

@ -331,6 +331,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme @@ -331,6 +331,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
this(converter, config, configTransformer, adminSupplier, clientIdBase, Time.SYSTEM);
}
@SuppressWarnings("this-escape")
KafkaConfigBackingStore(Converter converter, DistributedConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier, String clientIdBase, Time time) {
this.lock = new Object();
this.started = false;

7
core/src/main/java/kafka/log/remote/RemoteLogManager.java

@ -83,7 +83,6 @@ import java.io.InputStream; @@ -83,7 +83,6 @@ import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
@ -227,8 +226,9 @@ public class RemoteLogManager implements Closeable { @@ -227,8 +226,9 @@ public class RemoteLogManager implements Closeable {
}
}
@SuppressWarnings("removal")
RemoteStorageManager createRemoteStorageManager() {
return AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>() {
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>() {
private final String classPath = rlmConfig.remoteStorageManagerClassPath();
public RemoteStorageManager run() {
@ -249,8 +249,9 @@ public class RemoteLogManager implements Closeable { @@ -249,8 +249,9 @@ public class RemoteLogManager implements Closeable {
remoteLogStorageManager.configure(rsmProps);
}
@SuppressWarnings("removal")
RemoteLogMetadataManager createRemoteLogMetadataManager() {
return AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>() {
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>() {
private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath();
public RemoteLogMetadataManager run() {

2
generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java

@ -409,7 +409,7 @@ public final class MessageDataGenerator implements MessageClassGenerator { @@ -409,7 +409,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.READABLE_CLASS);
buffer.printf("@Override%n");
buffer.printf("public void read(Readable _readable, short _version) {%n");
buffer.printf("public final void read(Readable _readable, short _version) {%n");
buffer.incrementIndent();
VersionConditional.forVersions(parentVersions, struct.versions()).
allowMembershipCheckAlwaysFalse(false).

2
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java

@ -97,7 +97,7 @@ public class MetadataBatchLoader { @@ -97,7 +97,7 @@ public class MetadataBatchLoader {
*
* @param image Metadata image to reset this batch loader's state to.
*/
public void resetToImage(MetadataImage image) {
public final void resetToImage(MetadataImage image) {
this.image = image;
this.hasSeenRecord = true;
this.delta = new MetadataDelta.Builder().setImage(image).build();

1
raft/src/test/java/org/apache/kafka/raft/MockExpirationService.java

@ -28,6 +28,7 @@ public class MockExpirationService implements ExpirationService, MockTime.Listen @@ -28,6 +28,7 @@ public class MockExpirationService implements ExpirationService, MockTime.Listen
private final MockTime time;
private final PriorityQueue<ExpirationFuture<?>> queue = new PriorityQueue<>();
@SuppressWarnings("this-escape")
public MockExpirationService(MockTime time) {
this.time = time;
time.addListener(this);

2
server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java

@ -22,6 +22,8 @@ package org.apache.kafka.server.fault; @@ -22,6 +22,8 @@ package org.apache.kafka.server.fault;
* An exception thrown by a fault handler.
*/
public class FaultHandlerException extends RuntimeException {
@SuppressWarnings("this-escape")
public FaultHandlerException(String failureMessage, Throwable cause) {
super(failureMessage, cause);
// If a cause exception was provided, set our the stack trace its stack trace. This is

1
server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java

@ -46,6 +46,7 @@ public abstract class ShutdownableThread extends Thread { @@ -46,6 +46,7 @@ public abstract class ShutdownableThread extends Thread {
this(name, isInterruptible, "[" + name + "]: ");
}
@SuppressWarnings("this-escape")
public ShutdownableThread(String name, boolean isInterruptible, String logPrefix) {
super(name);
this.isInterruptible = isInterruptible;

2
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java

@ -32,7 +32,7 @@ public abstract class TimerTask implements Runnable { @@ -32,7 +32,7 @@ public abstract class TimerTask implements Runnable {
}
}
void setTimerTaskEntry(TimerTaskEntry entry) {
final void setTimerTaskEntry(TimerTaskEntry entry) {
synchronized (this) {
// if this timerTask is already held by an existing timer task entry,
// we will remove such an entry first.

1
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java

@ -23,6 +23,7 @@ public class TimerTaskEntry { @@ -23,6 +23,7 @@ public class TimerTaskEntry {
TimerTaskEntry next;
TimerTaskEntry prev;
@SuppressWarnings("this-escape")
public TimerTaskEntry(
TimerTask timerTask,
long expirationMs

1
server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java

@ -48,6 +48,7 @@ public class TimelineInteger implements Revertable { @@ -48,6 +48,7 @@ public class TimelineInteger implements Revertable {
private final SnapshotRegistry snapshotRegistry;
private int value;
@SuppressWarnings("this-escape")
public TimelineInteger(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.value = INIT;

1
server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java

@ -48,6 +48,7 @@ public class TimelineLong implements Revertable { @@ -48,6 +48,7 @@ public class TimelineLong implements Revertable {
private final SnapshotRegistry snapshotRegistry;
private long value;
@SuppressWarnings("this-escape")
public TimelineLong(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.value = INIT;

1
server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java

@ -52,6 +52,7 @@ public class TimelineObject<T> implements Revertable { @@ -52,6 +52,7 @@ public class TimelineObject<T> implements Revertable {
private final T initialValue;
private T value;
@SuppressWarnings("this-escape")
public TimelineObject(SnapshotRegistry snapshotRegistry, T initialValue) {
Objects.requireNonNull(initialValue);
this.snapshotRegistry = snapshotRegistry;

1
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java

@ -30,6 +30,7 @@ public class MockTime extends org.apache.kafka.common.utils.MockTime { @@ -30,6 +30,7 @@ public class MockTime extends org.apache.kafka.common.utils.MockTime {
this(System.currentTimeMillis(), System.nanoTime());
}
@SuppressWarnings("this-escape")
public MockTime(long currentTimeMs, long currentHiResTimeNs) {
super(0L, currentTimeMs, currentHiResTimeNs);
scheduler = new MockScheduler(this);

3
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java

@ -38,6 +38,7 @@ public class FileBasedRemoteLogMetadataCache extends RemoteLogMetadataCache { @@ -38,6 +38,7 @@ public class FileBasedRemoteLogMetadataCache extends RemoteLogMetadataCache {
private final RemoteLogMetadataSnapshotFile snapshotFile;
private final TopicIdPartition topicIdPartition;
@SuppressWarnings("this-escape")
public FileBasedRemoteLogMetadataCache(TopicIdPartition topicIdPartition,
Path partitionDir) {
if (!partitionDir.toFile().exists() || !partitionDir.toFile().isDirectory()) {
@ -54,7 +55,7 @@ public class FileBasedRemoteLogMetadataCache extends RemoteLogMetadataCache { @@ -54,7 +55,7 @@ public class FileBasedRemoteLogMetadataCache extends RemoteLogMetadataCache {
}
}
protected void loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapshot snapshot) {
protected final void loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapshot snapshot) {
log.info("Loading snapshot for partition {} is: {}", topicIdPartition, snapshot);
for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : snapshot.remoteLogSegmentMetadataSnapshots()) {
switch (metadataSnapshot.state()) {

6
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java

@ -65,7 +65,7 @@ public class RemoteLogMetadataSerde { @@ -65,7 +65,7 @@ public class RemoteLogMetadataSerde {
return MetadataRecordType.fromId(apiKey).newMetadataRecord();
}
protected Map<Short, RemoteLogMetadataTransform> createRemoteLogMetadataTransforms() {
protected final Map<Short, RemoteLogMetadataTransform> createRemoteLogMetadataTransforms() {
Map<Short, RemoteLogMetadataTransform> map = new HashMap<>();
map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform());
map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform());
@ -74,7 +74,7 @@ public class RemoteLogMetadataSerde { @@ -74,7 +74,7 @@ public class RemoteLogMetadataSerde {
return map;
}
protected Map<String, Short> createRemoteLogStorageClassToApiKeyMap() {
protected final Map<String, Short> createRemoteLogStorageClassToApiKeyMap() {
Map<String, Short> map = new HashMap<>();
map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY);
map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
@ -123,4 +123,4 @@ public class RemoteLogMetadataSerde { @@ -123,4 +123,4 @@ public class RemoteLogMetadataSerde {
remoteLogMetadataSerde.deserialize(consumerRecord.value()).toString());
}
}
}
}

1
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java

@ -56,6 +56,7 @@ public class LeaderEpochFileCache { @@ -56,6 +56,7 @@ public class LeaderEpochFileCache {
* @param topicPartition the associated topic partition
* @param checkpoint the checkpoint file
*/
@SuppressWarnings("this-escape")
public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) {
this.checkpoint = checkpoint;
this.topicPartition = topicPartition;

1
storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java

@ -72,6 +72,7 @@ public abstract class AbstractIndex implements Closeable { @@ -72,6 +72,7 @@ public abstract class AbstractIndex implements Closeable {
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
@SuppressWarnings("this-escape")
public AbstractIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {
Objects.requireNonNull(file);
this.file = file;

2
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java

@ -343,7 +343,7 @@ public class LogConfig extends AbstractConfig { @@ -343,7 +343,7 @@ public class LogConfig extends AbstractConfig {
this(props, Collections.emptySet());
}
@SuppressWarnings("deprecation")
@SuppressWarnings({"deprecation", "this-escape"})
public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
super(CONFIG, props, false);
this.props = Collections.unmodifiableMap(props);

1
storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java

@ -65,6 +65,7 @@ public class OffsetIndex extends AbstractIndex { @@ -65,6 +65,7 @@ public class OffsetIndex extends AbstractIndex {
this(file, baseOffset, maxIndexSize, true);
}
@SuppressWarnings("this-escape")
public OffsetIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {
super(file, baseOffset, maxIndexSize, writable);

1
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java

@ -60,6 +60,7 @@ public class TimeIndex extends AbstractIndex { @@ -60,6 +60,7 @@ public class TimeIndex extends AbstractIndex {
this(file, baseOffset, maxIndexSize, true);
}
@SuppressWarnings("this-escape")
public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {
super(file, baseOffset, maxIndexSize, writable);

1
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -865,6 +865,7 @@ public class KafkaStreams implements AutoCloseable { @@ -865,6 +865,7 @@ public class KafkaStreams implements AutoCloseable {
this(topologyMetadata, applicationConfigs, clientSupplier, Time.SYSTEM);
}
@SuppressWarnings("this-escape")
private KafkaStreams(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfigs,
final KafkaClientSupplier clientSupplier,

1
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java

@ -85,6 +85,7 @@ public class StreamsBuilder { @@ -85,6 +85,7 @@ public class StreamsBuilder {
*
* @param topologyConfigs the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
*/
@SuppressWarnings("this-escape")
public StreamsBuilder(final TopologyConfig topologyConfigs) {
topology = getNewTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;

1
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -1346,6 +1346,7 @@ public class StreamsConfig extends AbstractConfig { @@ -1346,6 +1346,7 @@ public class StreamsConfig extends AbstractConfig {
this(props, true);
}
@SuppressWarnings("this-escape")
protected StreamsConfig(final Map<?, ?> props,
final boolean doLog) {
super(CONFIG, props, doLog);

1
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java

@ -125,6 +125,7 @@ public class TopologyConfig extends AbstractConfig { @@ -125,6 +125,7 @@ public class TopologyConfig extends AbstractConfig {
this(null, globalAppConfigs, new Properties());
}
@SuppressWarnings("this-escape")
public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) {
super(CONFIG, topologyOverrides, false);

1
streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java

@ -33,6 +33,7 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ @@ -33,6 +33,7 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
this(materialized, null, null);
}
@SuppressWarnings("this-escape")
public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix) {

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

@ -435,7 +435,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { @@ -435,7 +435,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
return Collections.unmodifiableMap(checkpointFileCache);
}
public String changelogFor(final String storeName) {
public final String changelogFor(final String storeName) {
return storeToChangelogTopic.get(storeName);
}
}

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

@ -59,6 +59,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext<Object, Objec @@ -59,6 +59,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext<Object, Objec
final Map<String, DirtyEntryFlushListener> cacheNameToFlushListener = new HashMap<>();
@SuppressWarnings("this-escape")
public ProcessorContextImpl(final TaskId id,
final StreamsConfig config,
final ProcessorStateManager stateMgr,

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -110,7 +110,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -110,7 +110,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private boolean hasPendingTxCommit = false;
private Optional<Long> timeCurrentIdlingStarted;
@SuppressWarnings("rawtypes")
@SuppressWarnings({"rawtypes", "this-escape"})
public StreamTask(final TaskId id,
final Set<TopicPartition> inputPartitions,
final ProcessorTopology topology,

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -465,6 +465,7 @@ public class StreamThread extends Thread { @@ -465,6 +465,7 @@ public class StreamThread extends Thread {
}
}
@SuppressWarnings("this-escape")
public StreamThread(final Time time,
final StreamsConfig config,
final Admin adminClient,

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java

@ -72,6 +72,7 @@ public class DefaultTaskManager implements TaskManager { @@ -72,6 +72,7 @@ public class DefaultTaskManager implements TaskManager {
}
}
@SuppressWarnings("this-escape")
public DefaultTaskManager(final Time time,
final String clientId,
final TasksRegistry tasks,

2
streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java

@ -63,7 +63,7 @@ public class KafkaEmbedded { @@ -63,7 +63,7 @@ public class KafkaEmbedded {
* broker should listen to. Note that you cannot change the `log.dirs` setting
* currently.
*/
@SuppressWarnings("WeakerAccess")
@SuppressWarnings({"WeakerAccess", "this-escape"})
public KafkaEmbedded(final Properties config, final MockTime time) throws IOException {
tmpFolder = org.apache.kafka.test.TestUtils.tempDirectory();
logDir = org.apache.kafka.test.TestUtils.tempDirectory(tmpFolder.toPath(), "log");

1
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

@ -91,6 +91,7 @@ import static org.junit.Assert.assertThrows; @@ -91,6 +91,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
@SuppressWarnings("this-escape")
public class StoreChangelogReaderTest extends EasyMockSupport {
@Rule

1
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java

@ -74,6 +74,7 @@ import static org.junit.Assert.assertFalse; @@ -74,6 +74,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@SuppressWarnings("this-escape")
public class MeteredTimestampedKeyValueStoreTest {
@Rule

3
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java

@ -51,6 +51,7 @@ import static org.junit.Assert.assertFalse; @@ -51,6 +51,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
@RunWith(Parameterized.class)
@SuppressWarnings("this-escape")
public class TimestampedWindowStoreBuilderTest {
private static final String TIMESTAMP_STORE_NAME = "Timestamped Store";
private static final String TIMEORDERED_STORE_NAME = "TimeOrdered Store";
@ -257,4 +258,4 @@ public class TimestampedWindowStoreBuilderTest { @@ -257,4 +258,4 @@ public class TimestampedWindowStoreBuilderTest {
assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null"));
}
}
}

1
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java

@ -213,6 +213,7 @@ public class InternalMockProcessorContext<KOut, VOut> @@ -213,6 +213,7 @@ public class InternalMockProcessorContext<KOut, VOut>
this(stateDir, keySerde, valueSerde, metrics, config, collectorSupplier, cache, time, new TaskId(0, 0));
}
@SuppressWarnings("this-escape")
public InternalMockProcessorContext(final File stateDir,
final Serde<?> keySerde,
final Serde<?> valueSerde,

1
streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java

@ -41,6 +41,7 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> { @@ -41,6 +41,7 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>();
@SuppressWarnings("this-escape")
public MockRestoreConsumer(final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
super(OffsetResetStrategy.EARLIEST);

1
trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConfigurableProducerSpec.java

@ -119,6 +119,7 @@ public class ConfigurableProducerSpec extends TaskSpec { @@ -119,6 +119,7 @@ public class ConfigurableProducerSpec extends TaskSpec {
private final TopicsSpec activeTopic;
private final int activePartition;
@SuppressWarnings("this-escape")
@JsonCreator
public ConfigurableProducerSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,

1
trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java

@ -46,6 +46,7 @@ public class ConnectionStressSpec extends TaskSpec { @@ -46,6 +46,7 @@ public class ConnectionStressSpec extends TaskSpec {
FETCH_METADATA
}
@SuppressWarnings("this-escape")
@JsonCreator
public ConnectionStressSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,

1
trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java

@ -104,6 +104,7 @@ public class ConsumeBenchSpec extends TaskSpec { @@ -104,6 +104,7 @@ public class ConsumeBenchSpec extends TaskSpec {
private final int threadsPerWorker;
private final Optional<RecordProcessor> recordProcessor;
@SuppressWarnings("this-escape")
@JsonCreator
public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,

1
trogdor/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java

@ -74,6 +74,7 @@ public class ProduceBenchSpec extends TaskSpec { @@ -74,6 +74,7 @@ public class ProduceBenchSpec extends TaskSpec {
private final boolean useConfiguredPartitioner;
private final boolean skipFlush;
@SuppressWarnings("this-escape")
@JsonCreator
public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,

1
trogdor/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java

@ -42,6 +42,7 @@ public class RoundTripWorkloadSpec extends TaskSpec { @@ -42,6 +42,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
@SuppressWarnings("this-escape")
@JsonCreator
public RoundTripWorkloadSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,

1
trogdor/src/main/java/org/apache/kafka/trogdor/workload/SustainedConnectionSpec.java

@ -80,6 +80,7 @@ public class SustainedConnectionSpec extends TaskSpec { @@ -80,6 +80,7 @@ public class SustainedConnectionSpec extends TaskSpec {
private final int numThreads;
private final int refreshRateMs;
@SuppressWarnings("this-escape")
@JsonCreator
public SustainedConnectionSpec(
@JsonProperty("startMs") long startMs,

Loading…
Cancel
Save