Browse Source

KAFKA-15048: Improve handling of unexpected quorum controller errors (#13799)

When the active quorum controller encounters an "unexpected" error, such as a NullPointerException,
it currently resigns its leadership. This PR fixes it so that in addition to doing that, it also
increments the metadata error count metric. This will allow us to better track down these errors.

This PR also fixes a minor bug where performing read operations on a standby controller would
result in an unexpected RuntimeException. The bug happened because the standby controller does not
take in-memory snapshots, and read operations were attempting to read from the epoch of the latest
committed offset. The fix is for the standby controller to simply read the latest value of each
data structure. This is always safe, because standby controllers don't contain uncommitted data.

Also, fix a bug where listPartitionReassignments was reading the latest data, rather than data from
the last committed offset.

Reviewers: dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>
pull/13805/head
Colin Patrick McCabe 1 year ago committed by GitHub
parent
commit
146a6976ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      core/src/main/scala/kafka/server/ControllerServer.scala
  2. 16
      core/src/main/scala/kafka/server/SharedServer.scala
  3. 109
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  4. 8
      metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
  5. 109
      metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
  6. 7
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
  7. 28
      metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
  8. 108
      metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java

3
core/src/main/scala/kafka/server/ControllerServer.scala

@ -240,7 +240,8 @@ class ControllerServer( @@ -240,7 +240,8 @@ class ControllerServer(
setConfigurationValidator(new ControllerConfigurationValidator()).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
setFatalFaultHandler(sharedServer.quorumControllerFaultHandler).
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).
setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler).
setZkMigrationEnabled(config.migrationEnabled)
}
authorizer match {

16
core/src/main/scala/kafka/server/SharedServer.scala

@ -193,14 +193,24 @@ class SharedServer( @@ -193,14 +193,24 @@ class SharedServer(
})
/**
* The fault handler to use when the QuorumController experiences a fault.
* The fault handler to use when the QuorumController experiences a fatal fault.
*/
def quorumControllerFaultHandler: FaultHandler = faultHandlerFactory.build(
def fatalQuorumControllerFaultHandler: FaultHandler = faultHandlerFactory.build(
name = "quorum controller",
fatal = true,
action = () => SharedServer.this.synchronized {
Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
snapshotsDisabledReason.compareAndSet(null, "quorum controller fault")
snapshotsDisabledReason.compareAndSet(null, "quorum controller fatal fault")
})
/**
* The fault handler to use when the QuorumController experiences a non-fatal fault.
*/
def nonFatalQuorumControllerFaultHandler: FaultHandler = faultHandlerFactory.build(
name = "quorum controller",
fatal = false,
action = () => SharedServer.this.synchronized {
Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
})
/**

109
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -27,9 +27,7 @@ import org.apache.kafka.common.errors.ApiException; @@ -27,9 +27,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
@ -106,7 +104,6 @@ import org.apache.kafka.server.authorizer.AclDeleteResult; @@ -106,7 +104,6 @@ import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
@ -130,7 +127,6 @@ import java.util.Random; @@ -130,7 +127,6 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@ -181,6 +177,7 @@ public final class QuorumController implements Controller { @@ -181,6 +177,7 @@ public final class QuorumController implements Controller {
static public class Builder {
private final int nodeId;
private final String clusterId;
private FaultHandler nonFatalFaultHandler = null;
private FaultHandler fatalFaultHandler = null;
private Time time = Time.SYSTEM;
private String threadNamePrefix = null;
@ -209,6 +206,11 @@ public final class QuorumController implements Controller { @@ -209,6 +206,11 @@ public final class QuorumController implements Controller {
this.clusterId = clusterId;
}
public Builder setNonFatalFaultHandler(FaultHandler nonFatalFaultHandler) {
this.nonFatalFaultHandler = nonFatalFaultHandler;
return this;
}
public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) {
this.fatalFaultHandler = fatalFaultHandler;
return this;
@ -331,6 +333,8 @@ public final class QuorumController implements Controller { @@ -331,6 +333,8 @@ public final class QuorumController implements Controller {
throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
} else if (quorumFeatures == null) {
throw new IllegalStateException("You must specify the quorum features");
} else if (nonFatalFaultHandler == null) {
throw new IllegalStateException("You must specify a non-fatal fault handler.");
} else if (fatalFaultHandler == null) {
throw new IllegalStateException("You must specify a fatal fault handler.");
}
@ -349,6 +353,7 @@ public final class QuorumController implements Controller { @@ -349,6 +353,7 @@ public final class QuorumController implements Controller {
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
return new QuorumController(
nonFatalFaultHandler,
fatalFaultHandler,
logContext,
nodeId,
@ -425,25 +430,23 @@ public final class QuorumController implements Controller { @@ -425,25 +430,23 @@ public final class QuorumController implements Controller {
public static final String CONTROLLER_THREAD_SUFFIX = "QuorumControllerEventHandler";
private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX =
"The active controller appears to be node ";
private NotControllerException newNotControllerException() {
OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
if (latestController.isPresent()) {
return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
latestController.getAsInt() + ".");
} else {
return new NotControllerException("No controller appears to be active.");
}
private OptionalInt latestController() {
return raftClient.leaderAndEpoch().leaderId();
}
private NotControllerException newPreMigrationException() {
OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
if (latestController.isPresent()) {
return new NotControllerException("The controller is in pre-migration mode.");
/**
* @return The offset that we should perform read operations at.
*/
private long currentReadOffset() {
if (isActiveController()) {
// The active controller keeps an in-memory snapshot at the last committed offset,
// which we want to read from when performing read operations. This will avoid
// reading uncommitted data.
return lastCommittedOffset;
} else {
return new NotControllerException("No controller appears to be active.");
// Standby controllers never have uncommitted data in memory. Therefore, we always
// read the latest from every data structure.
return SnapshotRegistry.LATEST_EPOCH;
}
}
@ -458,41 +461,35 @@ public final class QuorumController implements Controller { @@ -458,41 +461,35 @@ public final class QuorumController implements Controller {
private Throwable handleEventException(String name,
OptionalLong startProcessingTimeNs,
Throwable exception) {
Throwable externalException =
ControllerExceptions.toExternalException(exception, () -> latestController());
if (!startProcessingTimeNs.isPresent()) {
log.error("{}: unable to start processing because of {}. Reason: {}", name,
exception.getClass().getSimpleName(), exception.getMessage());
if (exception instanceof ApiException) {
return exception;
} else {
return new UnknownServerException(exception);
}
return externalException;
}
long endProcessingTime = time.nanoseconds();
long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
if ((exception instanceof ApiException) ||
(exception instanceof BoundedListTooLongException)) {
if (ControllerExceptions.isExpected(exception)) {
log.info("{}: failed with {} in {} us. Reason: {}", name,
exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
if (exception instanceof BoundedListTooLongException) {
exception = new PolicyViolationException("Unable to perform excessively large " +
"batch operation.");
}
return exception;
return externalException;
}
if (isActiveController()) {
log.warn("{}: failed with unknown server exception {} at epoch {} in {} us. " +
"Renouncing leadership and reverting to the last committed offset {}.",
nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " +
"exception %s at epoch %d in %d us. Renouncing leadership and reverting " +
"to the last committed offset %d.",
name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
lastCommittedOffset, exception);
lastCommittedOffset), exception);
renounce();
} else {
log.warn("{}: failed with unknown server exception {} in {} us. " +
"The controller is already in standby mode.",
name, exception.getClass().getSimpleName(), deltaUs,
nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server " +
"exception %s in %d us. The controller is already in standby mode.",
name, exception.getClass().getSimpleName(), deltaUs),
exception);
}
return new UnknownServerException(exception);
return externalException;
}
/**
@ -703,11 +700,11 @@ public final class QuorumController implements Controller { @@ -703,11 +700,11 @@ public final class QuorumController implements Controller {
}
int controllerEpoch = curClaimEpoch;
if (!isActiveController(controllerEpoch)) {
throw newNotControllerException();
throw ControllerExceptions.newWrongControllerException(latestController());
}
if (featureControl.inPreMigrationMode() && !flags.contains(RUNS_IN_PREMIGRATION)) {
log.info("Cannot run write operation {} in pre-migration mode. Returning NOT_CONTROLLER.", name);
throw newPreMigrationException();
throw ControllerExceptions.newPreMigrationException(latestController());
}
startProcessingTimeNs = OptionalLong.of(now);
ControllerResult<T> result = op.generateRecordsAndResult();
@ -1320,7 +1317,8 @@ public final class QuorumController implements Controller { @@ -1320,7 +1317,8 @@ public final class QuorumController implements Controller {
raftClient.resign(curClaimEpoch);
curClaimEpoch = -1;
controllerMetrics.setActive(false);
deferredEventQueue.failAll(newNotControllerException());
deferredEventQueue.failAll(ControllerExceptions.
newWrongControllerException(OptionalInt.empty()));
if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
throw new RuntimeException("Unable to find last committed offset " +
@ -1350,8 +1348,7 @@ public final class QuorumController implements Controller { @@ -1350,8 +1348,7 @@ public final class QuorumController implements Controller {
ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, flags);
queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event);
event.future.exceptionally(e -> {
if (e instanceof UnknownServerException && e.getCause() != null &&
e.getCause() instanceof RejectedExecutionException) {
if (ControllerExceptions.isTimeoutException(e)) {
log.error("Cancelling deferred write event {} because the event queue " +
"is now closed.", name);
return null;
@ -1590,6 +1587,11 @@ public final class QuorumController implements Controller { @@ -1590,6 +1587,11 @@ public final class QuorumController implements Controller {
updateLastCommittedState(-1, -1, -1);
}
/**
* Handles faults that cause a controller failover, but which don't abort the process.
*/
private final FaultHandler nonFatalFaultHandler;
/**
* Handles faults that should normally be fatal to the process.
*/
@ -1801,6 +1803,7 @@ public final class QuorumController implements Controller { @@ -1801,6 +1803,7 @@ public final class QuorumController implements Controller {
private final RecordRedactor recordRedactor;
private QuorumController(
FaultHandler nonFatalFaultHandler,
FaultHandler fatalFaultHandler,
LogContext logContext,
int nodeId,
@ -1826,6 +1829,7 @@ public final class QuorumController implements Controller { @@ -1826,6 +1829,7 @@ public final class QuorumController implements Controller {
int maxRecordsPerBatch,
boolean zkMigrationEnabled
) {
this.nonFatalFaultHandler = nonFatalFaultHandler;
this.fatalFaultHandler = fatalFaultHandler;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
@ -1964,7 +1968,7 @@ public final class QuorumController implements Controller { @@ -1964,7 +1968,7 @@ public final class QuorumController implements Controller {
if (names.isEmpty())
return CompletableFuture.completedFuture(Collections.emptyMap());
return appendReadEvent("findTopicIds", context.deadlineNs(),
() -> replicationControl.findTopicIds(lastCommittedOffset, names));
() -> replicationControl.findTopicIds(currentReadOffset(), names));
}
@Override
@ -1972,7 +1976,7 @@ public final class QuorumController implements Controller { @@ -1972,7 +1976,7 @@ public final class QuorumController implements Controller {
ControllerRequestContext context
) {
return appendReadEvent("findAllTopicIds", context.deadlineNs(),
() -> replicationControl.findAllTopicIds(lastCommittedOffset));
() -> replicationControl.findAllTopicIds(currentReadOffset()));
}
@Override
@ -1983,7 +1987,7 @@ public final class QuorumController implements Controller { @@ -1983,7 +1987,7 @@ public final class QuorumController implements Controller {
if (ids.isEmpty())
return CompletableFuture.completedFuture(Collections.emptyMap());
return appendReadEvent("findTopicNames", context.deadlineNs(),
() -> replicationControl.findTopicNames(lastCommittedOffset, ids));
() -> replicationControl.findTopicNames(currentReadOffset(), ids));
}
@Override
@ -2003,7 +2007,7 @@ public final class QuorumController implements Controller { @@ -2003,7 +2007,7 @@ public final class QuorumController implements Controller {
Map<ConfigResource, Collection<String>> resources
) {
return appendReadEvent("describeConfigs", context.deadlineNs(),
() -> configurationControl.describeConfigs(lastCommittedOffset, resources));
() -> configurationControl.describeConfigs(currentReadOffset(), resources));
}
@Override
@ -2024,13 +2028,8 @@ public final class QuorumController implements Controller { @@ -2024,13 +2028,8 @@ public final class QuorumController implements Controller {
public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(
ControllerRequestContext context
) {
// It's possible that we call ApiVersionRequest before consuming the log since ApiVersionRequest is sent when
// initialize NetworkClient, we should not return an error since it would stop the NetworkClient from working correctly.
if (lastCommittedOffset == -1) {
return CompletableFuture.completedFuture(new FinalizedControllerFeatures(Collections.emptyMap(), -1));
}
return appendReadEvent("getFinalizedFeatures", context.deadlineNs(),
() -> featureControl.finalizedFeatures(lastCommittedOffset));
() -> featureControl.finalizedFeatures(currentReadOffset()));
}
@Override
@ -2075,7 +2074,7 @@ public final class QuorumController implements Controller { @@ -2075,7 +2074,7 @@ public final class QuorumController implements Controller {
new ListPartitionReassignmentsResponseData().setErrorMessage(null));
}
return appendReadEvent("listPartitionReassignments", context.deadlineNs(),
() -> replicationControl.listPartitionReassignments(request.topics()));
() -> replicationControl.listPartitionReassignments(request.topics(), currentReadOffset()));
}
@Override

8
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

@ -1902,18 +1902,20 @@ public class ReplicationControlManager { @@ -1902,18 +1902,20 @@ public class ReplicationControlManager {
}
ListPartitionReassignmentsResponseData listPartitionReassignments(
List<ListPartitionReassignmentsTopics> topicList) {
List<ListPartitionReassignmentsTopics> topicList,
long epoch
) {
ListPartitionReassignmentsResponseData response =
new ListPartitionReassignmentsResponseData().setErrorMessage(null);
if (topicList == null) {
// List all reassigning topics.
for (Entry<Uuid, int[]> entry : reassigningTopics.entrySet()) {
for (Entry<Uuid, int[]> entry : reassigningTopics.entrySet(epoch)) {
listReassigningTopic(response, entry.getKey(), Replicas.toList(entry.getValue()));
}
} else {
// List the given topics.
for (ListPartitionReassignmentsTopics topic : topicList) {
Uuid topicId = topicsByName.get(topic.name());
Uuid topicId = topicsByName.get(topic.name(), epoch);
if (topicId != null) {
listReassigningTopic(response, topicId, topic.partitionIndexes());
}

109
metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java

@ -17,9 +17,18 @@ @@ -17,9 +17,18 @@
package org.apache.kafka.controller.errors;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
public class ControllerExceptions {
@ -38,4 +47,104 @@ public class ControllerExceptions { @@ -38,4 +47,104 @@ public class ControllerExceptions {
if (!(exception instanceof TimeoutException)) return false;
return true;
}
/**
* Check if an exception is a NotController exception.
*
* @param exception The exception to check.
* @return True if the exception is a NotController exception.
*/
public static boolean isNotControllerException(Throwable exception) {
if (exception == null) return false;
if (exception instanceof ExecutionException) {
exception = exception.getCause();
if (exception == null) return false;
}
if (!(exception instanceof NotControllerException)) return false;
return true;
}
/**
* Create a new exception indicating that the controller is in pre-migration mode, so the
* operation cannot be completed.
*
* @param controllerId The current controller.
* @return The new NotControllerException.
*/
public static NotControllerException newPreMigrationException(OptionalInt controllerId) {
if (controllerId.isPresent()) {
return new NotControllerException("The controller is in pre-migration mode.");
} else {
return new NotControllerException("No controller appears to be active.");
}
}
/**
* Create a new exception indicating that current node is not the controller.
*
* @param controllerId The current controller.
* @return The new NotControllerException.
*/
public static NotControllerException newWrongControllerException(OptionalInt controllerId) {
if (controllerId.isPresent()) {
return new NotControllerException("The active controller appears to be node " +
controllerId.getAsInt() + ".");
} else {
return new NotControllerException("No controller appears to be active.");
}
}
/**
* Determine if an exception is expected. Unexpected exceptions trigger controller failovers
* when they are raised.
*
* @param exception The exception.
* @return True if the exception is expected.
*/
public static boolean isExpected(Throwable exception) {
if (exception instanceof ApiException) {
// ApiExceptions indicate errors that should be returned to the user.
return true;
} else if (exception instanceof NotLeaderException) {
// NotLeaderException is thrown if we try to append records, but are not the leader.
return true;
} else if (exception instanceof RejectedExecutionException) {
// This can happen when the controller is shutting down.
return true;
} else if (exception instanceof BoundedListTooLongException) {
// This can happen if we tried to create too many records.
return true;
} else if (exception instanceof InterruptedException) {
// Interrupted exceptions are not expected. They might happen during junit tests if
// the test gets stuck and must be terminated by sending IE to all the threads.
return false;
}
// Other exceptions are unexpected.
return false;
}
/**
* Translate an internal controller exception to its external equivalent.
*
* @param exception The internal exception.
* @return Its external equivalent.
*/
public static Throwable toExternalException(
Throwable exception,
Supplier<OptionalInt> latestControllerSupplier
) {
if (exception instanceof ApiException) {
return exception;
} else if (exception instanceof NotLeaderException) {
return newWrongControllerException(latestControllerSupplier.get());
} else if (exception instanceof RejectedExecutionException) {
return new TimeoutException("The controller is shutting down.", exception);
} else if (exception instanceof BoundedListTooLongException) {
return new PolicyViolationException("Unable to perform excessively large batch " +
"operation.");
} else if (exception instanceof InterruptedException) {
return new UnknownServerException("The controller was interrupted.");
}
return new UnknownServerException(exception);
}
}

7
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java

@ -43,6 +43,7 @@ public class QuorumControllerTestEnv implements AutoCloseable { @@ -43,6 +43,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
private final List<QuorumController> controllers;
private final LocalLogManagerTestEnv logEnv;
private final Map<Integer, MockFaultHandler> fatalFaultHandlers = new HashMap<>();
private final Map<Integer, MockFaultHandler> nonFatalFaultHandlers = new HashMap<>();
public static class Builder {
private final LocalLogManagerTestEnv logEnv;
@ -111,6 +112,9 @@ public class QuorumControllerTestEnv implements AutoCloseable { @@ -111,6 +112,9 @@ public class QuorumControllerTestEnv implements AutoCloseable {
MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
builder.setFatalFaultHandler(fatalFaultHandler);
fatalFaultHandlers.put(nodeId, fatalFaultHandler);
MockFaultHandler nonFatalFaultHandler = new MockFaultHandler("nonFatalFaultHandler");
builder.setNonFatalFaultHandler(nonFatalFaultHandler);
nonFatalFaultHandlers.put(nodeId, fatalFaultHandler);
controllerBuilderInitializer.accept(builder);
this.controllers.add(builder.build());
}
@ -165,5 +169,8 @@ public class QuorumControllerTestEnv implements AutoCloseable { @@ -165,5 +169,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
faultHandler.maybeRethrowFirstException();
}
for (MockFaultHandler faultHandler : nonFatalFaultHandlers.values()) {
faultHandler.maybeRethrowFirstException();
}
}
}

28
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

@ -1447,7 +1447,7 @@ public class ReplicationControlManagerTest { @@ -1447,7 +1447,7 @@ public class ReplicationControlManagerTest {
new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId();
ctx.createTestTopic("bar", new int[][] {
new int[] {1, 2, 3}}).topicId();
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
@ -1481,13 +1481,13 @@ public class ReplicationControlManagerTest { @@ -1481,13 +1481,13 @@ public class ReplicationControlManagerTest {
setRemovingReplicas(asList(3)).
setAddingReplicas(asList(0)).
setReplicas(asList(0, 2, 1, 3))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("bar").
setPartitionIndexes(asList(0, 1, 2)))));
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(asList(0, 1, 2)))));
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
@ -1550,7 +1550,7 @@ public class ReplicationControlManagerTest { @@ -1550,7 +1550,7 @@ public class ReplicationControlManagerTest {
setErrorCode(expectedError.code()))))),
alterPartitionResult.response());
ctx.replay(alterPartitionResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
@ParameterizedTest
@ -1767,7 +1767,7 @@ public class ReplicationControlManagerTest { @@ -1767,7 +1767,7 @@ public class ReplicationControlManagerTest {
new int[] {2, 3, 4, 1}}).topicId();
Uuid barId = ctx.createTestTopic("bar", new int[][] {
new int[] {4, 3, 2}}).topicId();
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
List<ApiMessageAndVersion> fenceRecords = new ArrayList<>();
replication.handleBrokerFenced(3, fenceRecords);
ctx.replay(fenceRecords);
@ -1822,13 +1822,13 @@ public class ReplicationControlManagerTest { @@ -1822,13 +1822,13 @@ public class ReplicationControlManagerTest {
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(asList(0, 1)).
setReplicas(asList(1, 2, 3, 4, 0))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(asList(0, 1, 2)))));
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("bar").
setPartitionIndexes(asList(0, 1, 2)))));
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
anonymousContextFor(ApiKeys.ALTER_PARTITION),
new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104).
@ -1870,7 +1870,7 @@ public class ReplicationControlManagerTest { @@ -1870,7 +1870,7 @@ public class ReplicationControlManagerTest {
setErrorMessage(null)))))),
cancelResult);
ctx.replay(cancelResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
assertEquals(new PartitionRegistration(new int[] {2, 3, 4}, new int[] {4, 2},
new int[] {}, new int[] {}, 4, LeaderRecoveryState.RECOVERED, 2, 3), replication.getPartition(barId, 0));
}
@ -2429,7 +2429,7 @@ public class ReplicationControlManagerTest { @@ -2429,7 +2429,7 @@ public class ReplicationControlManagerTest {
log.debug("Created topic with ID {}", topicId);
// Confirm we start off with no reassignments.
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
// Reassign to [2, 3]
ControllerResult<AlterPartitionReassignmentsResponseData> alterResultOne =
@ -2455,7 +2455,7 @@ public class ReplicationControlManagerTest { @@ -2455,7 +2455,7 @@ public class ReplicationControlManagerTest {
setReplicas(asList(2, 3, 0, 1))))));
// Make sure the reassignment metadata is as expected.
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
PartitionRegistration partition = replication.getPartition(topicId, 0);
@ -2527,7 +2527,7 @@ public class ReplicationControlManagerTest { @@ -2527,7 +2527,7 @@ public class ReplicationControlManagerTest {
setAddingReplicas(asList(4, 5)).
setReplicas(asList(4, 5, 0, 1, 2, 3))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
// Make sure the leader is in the replicas still
partition = replication.getPartition(topicId, 0);
@ -2561,7 +2561,7 @@ public class ReplicationControlManagerTest { @@ -2561,7 +2561,7 @@ public class ReplicationControlManagerTest {
// After reassignment is finally complete, make sure 4 is the leader now.
partition = replication.getPartition(topicId, 0);
assertEquals(4, partition.leader);
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
private static BrokerState brokerState(int brokerId, Long brokerEpoch) {

108
metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java

@ -17,15 +17,27 @@ @@ -17,15 +17,27 @@
package org.apache.kafka.controller.errors;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import static org.apache.kafka.controller.errors.ControllerExceptions.isExpected;
import static org.apache.kafka.controller.errors.ControllerExceptions.isTimeoutException;
import static org.apache.kafka.controller.errors.ControllerExceptions.newPreMigrationException;
import static org.apache.kafka.controller.errors.ControllerExceptions.newWrongControllerException;
import static org.apache.kafka.controller.errors.ControllerExceptions.toExternalException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -62,4 +74,100 @@ public class ControllerExceptionsTest { @@ -62,4 +74,100 @@ public class ControllerExceptionsTest {
public void testExecutionExceptionWithNullCauseIsNotTimeoutException() {
assertFalse(isTimeoutException(new ExecutionException(null)));
}
@Test
public void testNewPreMigrationExceptionWithNoController() {
assertExceptionsMatch(new NotControllerException("No controller appears to be active."),
newPreMigrationException(OptionalInt.empty()));
}
@Test
public void testNewPreMigrationExceptionWithActiveController() {
assertExceptionsMatch(new NotControllerException("The controller is in pre-migration mode."),
newPreMigrationException(OptionalInt.of(1)));
}
@Test
public void testNewWrongControllerExceptionWithNoController() {
assertExceptionsMatch(new NotControllerException("No controller appears to be active."),
newWrongControllerException(OptionalInt.empty()));
}
@Test
public void testNewWrongControllerExceptionWithActiveController() {
assertExceptionsMatch(new NotControllerException("The active controller appears to be node 1."),
newWrongControllerException(OptionalInt.of(1)));
}
@Test
public void testApiExceptionIsExpected() {
assertTrue(isExpected(new TopicExistsException("")));
}
@Test
public void testNotLeaderExceptionIsExpected() {
assertTrue(isExpected(new NotLeaderException("")));
}
@Test
public void testRejectedExecutionExceptionIsExpected() {
assertTrue(isExpected(new RejectedExecutionException()));
}
@Test
public void testInterruptedExceptionIsNotExpected() {
assertFalse(isExpected(new InterruptedException()));
}
@Test
public void testRuntimeExceptionIsNotExpected() {
assertFalse(isExpected(new NullPointerException()));
}
private static void assertExceptionsMatch(Throwable a, Throwable b) {
assertEquals(a.getClass(), b.getClass());
assertEquals(a.getMessage(), b.getMessage());
if (a.getCause() != null) {
assertNotNull(b.getCause());
assertExceptionsMatch(a.getCause(), b.getCause());
} else {
assertNull(b.getCause());
}
}
@Test
public void testApiExceptionToExternalException() {
assertExceptionsMatch(new TopicExistsException("Topic foo exists"),
toExternalException(new TopicExistsException("Topic foo exists"),
() -> OptionalInt.of(1)));
}
@Test
public void testNotLeaderExceptionToExternalException() {
assertExceptionsMatch(new NotControllerException("The active controller appears to be node 1."),
toExternalException(new NotLeaderException("Append failed because the given epoch 123 is stale."),
() -> OptionalInt.of(1)));
}
@Test
public void testRejectedExecutionExceptionToExternalException() {
assertExceptionsMatch(new TimeoutException("The controller is shutting down.",
new RejectedExecutionException("The event queue is shutting down")),
toExternalException(new RejectedExecutionException("The event queue is shutting down"),
() -> OptionalInt.empty()));
}
@Test
public void testInterruptedExceptionToExternalException() {
assertExceptionsMatch(new UnknownServerException("The controller was interrupted."),
toExternalException(new InterruptedException(),
() -> OptionalInt.empty()));
}
@Test
public void testRuntimeExceptionToExternalException() {
assertExceptionsMatch(new UnknownServerException(new NullPointerException("Null pointer exception")),
toExternalException(new NullPointerException("Null pointer exception"),
() -> OptionalInt.empty()));
}
}

Loading…
Cancel
Save