Browse Source

MINOR: Standardize controller log4j output for replaying records

Standardize controller log4j output for replaying important records. The log message should include
word "replayed" to make it clear that this is a record replay. Log the replay of records for ACLs,
client quotas, and producer IDs, which were previously not logged. Also fix a case where we weren't
logging changes to broker registrations.

AclControlManager, ClientQuotaControlManager, and ProducerIdControlManager didn't previously have a
log4j logger object, so this PR adds one. It also converts them to using Builder objects. This
makes junit tests more readable because we don't need to specify paramaters where the test can use
the default (like LogContexts).

Throw an exception in replay if we get another TopicRecord for a topic which already exists.
cmccabe_2023-05-10_cleanup
Colin P. McCabe 1 year ago
parent
commit
5a1aa1a670
  1. 53
      metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
  2. 34
      metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
  3. 12
      metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
  4. 6
      metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
  5. 21
      metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
  6. 47
      metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
  7. 21
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  8. 27
      metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
  9. 10
      metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java
  10. 13
      metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
  11. 14
      metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
  12. 2
      metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java
  13. 5
      metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
  14. 22
      metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

53
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java

@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.UnknownServerException; @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAclWithId;
@ -39,6 +40,7 @@ import org.apache.kafka.server.mutable.BoundedListTooLongException; @@ -39,6 +40,7 @@ import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
@ -68,12 +70,44 @@ import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_ @@ -68,12 +70,44 @@ import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_
* completed, which is another reason the prepare / complete callbacks are needed.
*/
public class AclControlManager {
static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
Builder setClusterMetadataAuthorizer(Optional<ClusterMetadataAuthorizer> authorizer) {
this.authorizer = authorizer;
return this;
}
AclControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
return new AclControlManager(logContext, snapshotRegistry, authorizer);
}
}
private final Logger log;
private final TimelineHashMap<Uuid, StandardAcl> idToAcl;
private final TimelineHashSet<StandardAcl> existingAcls;
private final Optional<ClusterMetadataAuthorizer> authorizer;
AclControlManager(SnapshotRegistry snapshotRegistry,
Optional<ClusterMetadataAuthorizer> authorizer) {
AclControlManager(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
Optional<ClusterMetadataAuthorizer> authorizer
) {
this.log = logContext.logger(AclControlManager.class);
this.idToAcl = new TimelineHashMap<>(snapshotRegistry, 0);
this.existingAcls = new TimelineHashSet<>(snapshotRegistry, 0);
this.authorizer = authorizer;
@ -193,8 +227,10 @@ public class AclControlManager { @@ -193,8 +227,10 @@ public class AclControlManager {
}
}
public void replay(AccessControlEntryRecord record,
Optional<OffsetAndEpoch> snapshotId) {
public void replay(
AccessControlEntryRecord record,
Optional<OffsetAndEpoch> snapshotId
) {
StandardAclWithId aclWithId = StandardAclWithId.fromRecord(record);
idToAcl.put(aclWithId.id(), aclWithId.acl());
existingAcls.add(aclWithId.acl());
@ -203,10 +239,14 @@ public class AclControlManager { @@ -203,10 +239,14 @@ public class AclControlManager {
a.addAcl(aclWithId.id(), aclWithId.acl());
});
}
log.info("Replayed AccessControlEntryRecord for {}, setting {}", record.id(),
aclWithId.acl());
}
public void replay(RemoveAccessControlEntryRecord record,
Optional<OffsetAndEpoch> snapshotId) {
public void replay(
RemoveAccessControlEntryRecord record,
Optional<OffsetAndEpoch> snapshotId
) {
StandardAcl acl = idToAcl.remove(record.id());
if (acl == null) {
throw new RuntimeException("Unable to replay " + record + ": no acl with " +
@ -221,6 +261,7 @@ public class AclControlManager { @@ -221,6 +261,7 @@ public class AclControlManager {
a.removeAcl(record.id());
});
}
log.info("Replayed RemoveAccessControlEntryRecord for {}, removing {}", record.id(), acl);
}
Map<Uuid, StandardAcl> idToAcl() {

34
metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java

@ -26,10 +26,12 @@ import org.apache.kafka.common.protocol.Errors; @@ -26,10 +26,12 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
@ -48,11 +50,38 @@ import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_ @@ -48,11 +50,38 @@ import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_
public class ClientQuotaControlManager {
static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
ClientQuotaControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
return new ClientQuotaControlManager(logContext, snapshotRegistry);
}
}
private final Logger log;
private final SnapshotRegistry snapshotRegistry;
final TimelineHashMap<ClientQuotaEntity, TimelineHashMap<String, Double>> clientQuotaData;
ClientQuotaControlManager(SnapshotRegistry snapshotRegistry) {
ClientQuotaControlManager(
LogContext logContext,
SnapshotRegistry snapshotRegistry
) {
this.log = logContext.logger(ClientQuotaControlManager.class);
this.snapshotRegistry = snapshotRegistry;
this.clientQuotaData = new TimelineHashMap<>(snapshotRegistry, 0);
}
@ -113,8 +142,11 @@ public class ClientQuotaControlManager { @@ -113,8 +142,11 @@ public class ClientQuotaControlManager {
if (quotas.size() == 0) {
clientQuotaData.remove(entity);
}
log.info("Replayed ClientQuotaRecord for {} removing {}.", entity, record.key());
} else {
quotas.put(record.key(), record.value());
log.info("Replayed ClientQuotaRecord for {} setting {} to {}.",
entity, record.key(), record.value());
}
}

12
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

@ -440,11 +440,13 @@ public class ClusterControlManager { @@ -440,11 +440,13 @@ public class ClusterControlManager {
heartbeatManager.register(brokerId, record.fenced());
}
if (prevRegistration == null) {
log.info("Registered new broker: {}", record);
log.info("Replayed initial RegisterBrokerRecord for broker {}: {}", record.brokerId(), record);
} else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
log.info("Re-registered broker incarnation: {}", record);
log.info("Replayed RegisterBrokerRecord modifying the registration for broker {}: {}",
record.brokerId(), record);
} else {
log.info("Re-registered broker id {}: {}", brokerId, record);
log.info("Replayed RegisterBrokerRecord establishing a new incarnation of broker {}: {}",
record.brokerId(), record);
}
}
@ -461,7 +463,7 @@ public class ClusterControlManager { @@ -461,7 +463,7 @@ public class ClusterControlManager {
} else {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
brokerRegistrations.remove(brokerId);
log.info("Unregistered broker: {}", record);
log.info("Replayed {}", record);
}
}
@ -523,6 +525,8 @@ public class ClusterControlManager { @@ -523,6 +525,8 @@ public class ClusterControlManager {
inControlledShutdownChange
);
if (!curRegistration.equals(nextRegistration)) {
log.info("Replayed {} modifying the registration for broker {}: {}",
record.getClass().getSimpleName(), brokerId, record);
brokerRegistrations.put(brokerId, nextRegistration);
} else {
log.info("Ignoring no-op registration change for {}", curRegistration);

6
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java

@ -416,9 +416,11 @@ public class ConfigurationControlManager { @@ -416,9 +416,11 @@ public class ConfigurationControlManager {
configData.remove(configResource);
}
if (configSchema.isSensitive(record)) {
log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN);
log.info("Replayed ConfigRecord for {} which set configuration {} to {}",
configResource, record.name(), Password.HIDDEN);
} else {
log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
log.info("Replayed ConfigRecord for {} which set configuration {} to {}",
configResource, record.name(), record.value());
}
}

21
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java

@ -327,24 +327,31 @@ public class FeatureControlManager { @@ -327,24 +327,31 @@ public class FeatureControlManager {
}
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
MetadataVersion mv = MetadataVersion.fromFeatureLevel(record.featureLevel());
log.info("Setting metadata version to {}", mv);
metadataVersion.set(mv);
log.info("Replayed a FeatureLevelRecord setting metadata version to {}", mv);
} else {
if (record.featureLevel() == 0) {
log.info("Removing feature {}", record.name());
finalizedVersions.remove(record.name());
log.info("Replayed a FeatureLevelRecord removing feature {}", record.name());
} else {
log.info("Setting feature {} to {}", record.name(), record.featureLevel());
finalizedVersions.put(record.name(), record.featureLevel());
log.info("Replayed a FeatureLevelRecord setting feature {} to {}",
record.name(), record.featureLevel());
}
}
}
public void replay(ZkMigrationStateRecord record) {
ZkMigrationState recordState = ZkMigrationState.of(record.zkMigrationState());
ZkMigrationState currentState = migrationControlState.get();
log.info("Transitioning ZK migration state from {} to {}", currentState, recordState);
migrationControlState.set(recordState);
ZkMigrationState newState = ZkMigrationState.of(record.zkMigrationState());
ZkMigrationState previousState = migrationControlState.get();
if (previousState.equals(newState)) {
log.debug("Replayed a ZkMigrationStateRecord which did not alter the state from {}.",
previousState);
} else {
migrationControlState.set(newState);
log.info("Replayed a ZkMigrationStateRecord changing the migration state from {} to {}.",
previousState, newState);
}
}
boolean isControllerId(int nodeId) {

47
metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java

@ -19,21 +19,62 @@ package org.apache.kafka.controller; @@ -19,21 +19,62 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;
import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;
import java.util.Collections;
public class ProducerIdControlManager {
static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private ClusterControlManager clusterControlManager = null;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
Builder setClusterControlManager(ClusterControlManager clusterControlManager) {
this.clusterControlManager = clusterControlManager;
return this;
}
ProducerIdControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (clusterControlManager == null) {
throw new RuntimeException("You must specify ClusterControlManager.");
}
return new ProducerIdControlManager(
logContext,
clusterControlManager,
snapshotRegistry);
}
}
private final Logger log;
private final ClusterControlManager clusterControlManager;
private final TimelineObject<ProducerIdsBlock> nextProducerBlock;
private final TimelineLong brokerEpoch;
ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
private ProducerIdControlManager(
LogContext logContext,
ClusterControlManager clusterControlManager,
SnapshotRegistry snapshotRegistry
) {
this.log = logContext.logger(ProducerIdControlManager.class);
this.clusterControlManager = clusterControlManager;
this.nextProducerBlock = new TimelineObject<>(snapshotRegistry, ProducerIdsBlock.EMPTY);
this.brokerEpoch = new TimelineLong(snapshotRegistry);
@ -71,7 +112,9 @@ public class ProducerIdControlManager { @@ -71,7 +112,9 @@ public class ProducerIdControlManager {
throw new RuntimeException("Next Producer ID from replayed record (" + record.nextProducerId() + ")" +
" is not greater than current next Producer ID in block (" + nextBlock + ")");
} else {
nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(), record.nextProducerId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE));
log.info("Replaying ProducerIdsRecord {}", record);
nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(), record.nextProducerId(),
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE));
brokerEpoch.set(record.brokerEpoch());
}
}

21
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -1850,7 +1850,10 @@ public final class QuorumController implements Controller { @@ -1850,7 +1850,10 @@ public final class QuorumController implements Controller {
setStaticConfig(staticConfig).
setNodeId(nodeId).
build();
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clientQuotaControlManager = new ClientQuotaControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
build();
this.featureControl = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(quorumFeatures).
@ -1872,7 +1875,11 @@ public final class QuorumController implements Controller { @@ -1872,7 +1875,11 @@ public final class QuorumController implements Controller {
setFeatureControlManager(featureControl).
setZkMigrationEnabled(zkMigrationEnabled).
build();
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setClusterControlManager(clusterControl).
build();
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
this.maxIdleIntervalNs = maxIdleIntervalNs;
this.replicationControl = new ReplicationControlManager.Builder().
@ -1892,10 +1899,14 @@ public final class QuorumController implements Controller { @@ -1892,10 +1899,14 @@ public final class QuorumController implements Controller {
build();
this.authorizer = authorizer;
authorizer.ifPresent(a -> a.setAclMutator(this));
this.aclControlManager = new AclControlManager(snapshotRegistry, authorizer);
this.aclControlManager = new AclControlManager.Builder().
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
setClusterMetadataAuthorizer(authorizer).
build();
this.logReplayTracker = new LogReplayTracker.Builder().
setLogContext(logContext).
build();
setLogContext(logContext).
build();
this.raftClient = raftClient;
this.bootstrapMetadata = bootstrapMetadata;
this.maxRecordsPerBatch = maxRecordsPerBatch;

27
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

@ -379,7 +379,19 @@ public class ReplicationControlManager { @@ -379,7 +379,19 @@ public class ReplicationControlManager {
}
public void replay(TopicRecord record) {
topicsByName.put(record.name(), record.topicId());
Uuid existingUuid = topicsByName.put(record.name(), record.topicId());
if (existingUuid != null) {
// We don't currently support sending a second TopicRecord for the same topic name...
// unless, of course, there is a RemoveTopicRecord in between.
if (existingUuid.equals(record.topicId())) {
throw new RuntimeException("Found duplicate TopicRecord for " + record.name() +
" with topic ID " + record.topicId());
} else {
throw new RuntimeException("Found duplicate TopicRecord for " + record.name() +
" with a different ID than before. Previous ID was " + existingUuid +
" and new ID is " + record.topicId());
}
}
if (Topic.hasCollisionChars(record.name())) {
String normalizedName = Topic.unifyCollisionChars(record.name());
TimelineHashSet<String> topicNames = topicsWithCollisionChars.get(normalizedName);
@ -391,7 +403,7 @@ public class ReplicationControlManager { @@ -391,7 +403,7 @@ public class ReplicationControlManager {
}
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
log.info("Created topic {} with topic ID {}.", record.name(), record.topicId());
log.info("Replayed TopicRecord for topic {} with topic ID {}.", record.name(), record.topicId());
}
public void replay(PartitionRecord record) {
@ -405,13 +417,16 @@ public class ReplicationControlManager { @@ -405,13 +417,16 @@ public class ReplicationControlManager {
String description = topicInfo.name + "-" + record.partitionId() +
" with topic ID " + record.topicId();
if (prevPartInfo == null) {
log.info("Created partition {} and {}.", description, newPartInfo);
log.info("Replayed PartitionRecord for new partition {} and {}.", description,
newPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
false, isReassignmentInProgress(newPartInfo));
} else if (!newPartInfo.equals(prevPartInfo)) {
log.info("Replayed PartitionRecord for existing partition {} and {}.", description,
newPartInfo);
newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
@ -475,8 +490,8 @@ public class ReplicationControlManager { @@ -475,8 +490,8 @@ public class ReplicationControlManager {
if (record.removingReplicas() != null || record.addingReplicas() != null) {
log.info("Replayed partition assignment change {} for topic {}", record, topicInfo.name);
} else if (log.isTraceEnabled()) {
log.trace("Replayed partition change {} for topic {}", record, topicInfo.name);
} else if (log.isDebugEnabled()) {
log.debug("Replayed partition change {} for topic {}", record, topicInfo.name);
}
}
@ -516,7 +531,7 @@ public class ReplicationControlManager { @@ -516,7 +531,7 @@ public class ReplicationControlManager {
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
log.info("Replayed RemoveTopicRecord for topic {} with ID {}.", topic.name, record.topicId());
}
ControllerResult<CreateTopicsResponseData> createTopics(

10
metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java

@ -322,7 +322,7 @@ public class ScramControlManager { @@ -322,7 +322,7 @@ public class ScramControlManager {
if (credentials.remove(key) == null) {
throw new RuntimeException("Unable to find credential to delete: " + key);
}
log.info("Removed SCRAM credential for {} with mechanism {}.",
log.info("Replayed RemoveUserScramCredentialRecord for {} with mechanism {}.",
key.username, key.mechanism);
}
@ -334,11 +334,11 @@ public class ScramControlManager { @@ -334,11 +334,11 @@ public class ScramControlManager {
record.serverKey(),
record.iterations());
if (credentials.put(key, value) == null) {
log.info("Created new SCRAM credential for {} with mechanism {}.",
key.username, key.mechanism);
log.info("Replayed UserScramCredentialRecord creating new entry for {} with " +
"mechanism {}.", key.username, key.mechanism);
} else {
log.info("Modified SCRAM credential for {} with mechanism {}.",
key.username, key.mechanism);
log.info("Replayed UserScramCredentialRecord modifying existing entry for {} " +
"with mechanism {}.", key.username, key.mechanism);
}
}

13
metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java

@ -203,7 +203,9 @@ public class AclControlManagerTest { @@ -203,7 +203,9 @@ public class AclControlManagerTest {
public void testLoadSnapshot() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
snapshotRegistry.getOrCreateSnapshot(0);
AclControlManager manager = new AclControlManager(snapshotRegistry, Optional.empty());
AclControlManager manager = new AclControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();
// Load TEST_ACLS into the AclControlManager.
Set<ApiMessageAndVersion> loadedAcls = new HashSet<>();
@ -236,8 +238,7 @@ public class AclControlManagerTest { @@ -236,8 +238,7 @@ public class AclControlManagerTest {
@Test
public void testAddAndDelete() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
AclControlManager manager = new AclControlManager(snapshotRegistry, Optional.empty());
AclControlManager manager = new AclControlManager.Builder().build();
MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer();
authorizer.loadSnapshot(manager.idToAcl());
manager.replay(StandardAclWithIdTest.TEST_ACLS.get(0).toRecord(), Optional.empty());
@ -248,8 +249,7 @@ public class AclControlManagerTest { @@ -248,8 +249,7 @@ public class AclControlManagerTest {
@Test
public void testCreateAclDeleteAcl() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
AclControlManager manager = new AclControlManager(snapshotRegistry, Optional.empty());
AclControlManager manager = new AclControlManager.Builder().build();
MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer();
authorizer.loadSnapshot(manager.idToAcl());
@ -311,8 +311,7 @@ public class AclControlManagerTest { @@ -311,8 +311,7 @@ public class AclControlManagerTest {
@Test
public void testDeleteDedupe() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
AclControlManager manager = new AclControlManager(snapshotRegistry, Optional.empty());
AclControlManager manager = new AclControlManager.Builder().build();
MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer();
authorizer.loadSnapshot(manager.idToAcl());

14
metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java

@ -25,10 +25,8 @@ import org.apache.kafka.common.protocol.Errors; @@ -25,10 +25,8 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -52,8 +50,7 @@ public class ClientQuotaControlManagerTest { @@ -52,8 +50,7 @@ public class ClientQuotaControlManagerTest {
@Test
public void testInvalidEntityTypes() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClientQuotaControlManager manager = new ClientQuotaControlManager(snapshotRegistry);
ClientQuotaControlManager manager = new ClientQuotaControlManager.Builder().build();
// Unknown type "foo"
assertInvalidEntity(manager, entity("foo", "bar"));
@ -79,8 +76,7 @@ public class ClientQuotaControlManagerTest { @@ -79,8 +76,7 @@ public class ClientQuotaControlManagerTest {
@Test
public void testInvalidQuotaKeys() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClientQuotaControlManager manager = new ClientQuotaControlManager(snapshotRegistry);
ClientQuotaControlManager manager = new ClientQuotaControlManager.Builder().build();
ClientQuotaEntity entity = entity(ClientQuotaEntity.USER, "user-1");
// Invalid + valid keys
@ -103,8 +99,7 @@ public class ClientQuotaControlManagerTest { @@ -103,8 +99,7 @@ public class ClientQuotaControlManagerTest {
@Test
public void testAlterAndRemove() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClientQuotaControlManager manager = new ClientQuotaControlManager(snapshotRegistry);
ClientQuotaControlManager manager = new ClientQuotaControlManager.Builder().build();
ClientQuotaEntity userEntity = userEntity("user-1");
List<ClientQuotaAlteration> alters = new ArrayList<>();
@ -178,8 +173,7 @@ public class ClientQuotaControlManagerTest { @@ -178,8 +173,7 @@ public class ClientQuotaControlManagerTest {
@Test
public void testEntityTypes() throws Exception {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClientQuotaControlManager manager = new ClientQuotaControlManager(snapshotRegistry);
ClientQuotaControlManager manager = new ClientQuotaControlManager.Builder().build();
Map<ClientQuotaEntity, Map<String, Double>> quotasToTest = new HashMap<>();
quotasToTest.put(userClientEntity("user-1", "client-id-1"),

2
metadata/src/test/java/org/apache/kafka/controller/MockAclControlManager.java

@ -33,7 +33,7 @@ import java.util.Optional; @@ -33,7 +33,7 @@ import java.util.Optional;
public class MockAclControlManager extends AclControlManager {
public MockAclControlManager(LogContext logContext,
Optional<ClusterMetadataAuthorizer> authorizer) {
super(new SnapshotRegistry(logContext), authorizer);
super(logContext, new SnapshotRegistry(logContext), authorizer);
}
public List<AclCreateResult> createAndReplayAcls(List<AclBinding> acls) {

5
metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java

@ -72,7 +72,10 @@ public class ProducerIdControlManagerTest { @@ -72,7 +72,10 @@ public class ProducerIdControlManagerTest {
clusterControl.replay(brokerRecord, 100L);
}
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setClusterControlManager(clusterControl).
setSnapshotRegistry(snapshotRegistry).
build();
}
@Test

22
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

@ -2571,4 +2571,26 @@ public class ReplicationControlManagerTest { @@ -2571,4 +2571,26 @@ public class ReplicationControlManagerTest {
return Arrays.stream(isr).map(brokerId -> brokerState(brokerId, defaultBrokerEpoch(brokerId)))
.collect(Collectors.toList());
}
@Test
public void testDuplicateTopicIdReplay() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
replicationControl.replay(new TopicRecord().
setName("foo").
setTopicId(Uuid.fromString("Ktv3YkMQRe-MId4VkkrMyw")));
assertEquals("Found duplicate TopicRecord for foo with topic ID Ktv3YkMQRe-MId4VkkrMyw",
assertThrows(RuntimeException.class,
() -> replicationControl.replay(new TopicRecord().
setName("foo").
setTopicId(Uuid.fromString("Ktv3YkMQRe-MId4VkkrMyw")))).
getMessage());
assertEquals("Found duplicate TopicRecord for foo with a different ID than before. " +
"Previous ID was Ktv3YkMQRe-MId4VkkrMyw and new ID is 8auUWq8zQqe_99H_m2LAmw",
assertThrows(RuntimeException.class,
() -> replicationControl.replay(new TopicRecord().
setName("foo").
setTopicId(Uuid.fromString("8auUWq8zQqe_99H_m2LAmw")))).
getMessage());
}
}

Loading…
Cancel
Save