Browse Source

KAFKA-15019: Improve handling of broker heartbeat timeouts (#13759)

When the active KRaft controller is overloaded, it will not be able to process broker heartbeat
requests. Instead, they will be timed out. When using the default configuration, this will happen
if the time needed to process a broker heartbeat climbs above a second for a sustained period.
This, in turn, could lead to brokers being improperly fenced when they are still alive.

With this PR, timed out heartbeats will still update the lastContactNs and metadataOffset of the
broker in the BrokerHeartbeatManager. While we don't generate any records, this should still be
adequate to prevent spurious fencing. We also log a message at ERROR level so that this condition
will be more obvious.

Other small changes in this PR: fix grammar issue in log4j of BrokerHeartbeatManager. Add JavaDoc
for ClusterControlManager#zkMigrationEnabled field. Add builder for ReplicationControlTestContext
to avoid having tons of constructors. Update ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS to
match the default in KafkaConfig.

Reviewers: Ismael Juma <ijuma@apache.org>, Ron Dagostino <rdagostino@confluent.io>
pull/13790/head
Colin Patrick McCabe 1 year ago committed by GitHub
parent
commit
9b3db6d50a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
  2. 4
      metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
  3. 5
      metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
  4. 10
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  5. 21
      metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
  6. 41
      metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
  7. 13
      metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
  8. 149
      metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
  9. 65
      metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java

28
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala

@ -34,6 +34,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, @@ -34,6 +34,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer._
@ -1106,6 +1107,33 @@ class KRaftClusterTest { @@ -1106,6 +1107,33 @@ class KRaftClusterTest {
cluster.close()
}
}
@Test
def testTimedOutHeartbeats(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(3).
setNumControllerNodes(1).build()).
setConfigProp(KafkaConfig.BrokerHeartbeatIntervalMsProp, 10.toString).
setConfigProp(KafkaConfig.BrokerSessionTimeoutMsProp, 1000.toString).
build()
try {
cluster.format()
cluster.startup()
val controller = cluster.controllers().values().iterator().next()
controller.controller.waitForReadyBrokers(3).get()
TestUtils.retry(60000) {
val latch = controller.controller.asInstanceOf[QuorumController].pause()
Thread.sleep(1001)
latch.countDown()
assertEquals(0, controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
assertTrue(controller.quorumControllerMetrics.timedOutHeartbeats() > 0,
"Expected timedOutHeartbeats to be greater than 0.");
}
} finally {
cluster.close()
}
}
}
class BadAuthorizer() extends Authorizer {

4
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java

@ -550,14 +550,14 @@ public class BrokerHeartbeatManager { @@ -550,14 +550,14 @@ public class BrokerHeartbeatManager {
} else if (!request.wantFence()) {
if (request.currentMetadataOffset() >= registerBrokerRecordOffset) {
log.info("The request from broker {} to unfence has been granted " +
"because it has caught up with the offset of it's register " +
"because it has caught up with the offset of its register " +
"broker record {}.", brokerId, registerBrokerRecordOffset);
return new BrokerControlStates(currentState, UNFENCED);
} else {
if (log.isDebugEnabled()) {
log.debug("The request from broker {} to unfence cannot yet " +
"be granted because it has not caught up with the offset of " +
"it's register broker record {}. It is still at offset {}.",
"its register broker record {}. It is still at offset {}.",
brokerId, registerBrokerRecordOffset, request.currentMetadataOffset());
}
return new BrokerControlStates(currentState, FENCED);

5
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

@ -74,7 +74,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -74,7 +74,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
* brokers being fenced or unfenced, and broker feature versions.
*/
public class ClusterControlManager {
final static long DEFAULT_SESSION_TIMEOUT_NS = NANOSECONDS.convert(18, TimeUnit.SECONDS);
final static long DEFAULT_SESSION_TIMEOUT_NS = NANOSECONDS.convert(9, TimeUnit.SECONDS);
static class Builder {
private LogContext logContext = null;
@ -236,6 +236,9 @@ public class ClusterControlManager { @@ -236,6 +236,9 @@ public class ClusterControlManager {
*/
private final FeatureControlManager featureControl;
/**
* True if migration from ZK is enabled.
*/
private final boolean zkMigrationEnabled;
private ClusterControlManager(

10
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -78,6 +78,7 @@ import org.apache.kafka.common.requests.ApiError; @@ -78,6 +78,7 @@ import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
@ -2133,7 +2134,12 @@ public final class QuorumController implements Controller { @@ -2133,7 +2134,12 @@ public final class QuorumController implements Controller {
}
}
},
EnumSet.of(RUNS_IN_PREMIGRATION));
EnumSet.of(RUNS_IN_PREMIGRATION)).whenComplete((__, t) -> {
if (ControllerExceptions.isTimeoutException(t)) {
replicationControl.processExpiredBrokerHeartbeat(request);
controllerMetrics.incrementTimedOutHeartbeats();
}
});
}
@Override
@ -2286,7 +2292,7 @@ public final class QuorumController implements Controller { @@ -2286,7 +2292,7 @@ public final class QuorumController implements Controller {
}
// VisibleForTesting
CountDownLatch pause() {
public CountDownLatch pause() {
final CountDownLatch latch = new CountDownLatch(1);
appendControlEvent("pause", () -> {
try {

21
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

@ -1384,7 +1384,9 @@ public class ReplicationControlManager { @@ -1384,7 +1384,9 @@ public class ReplicationControlManager {
}
ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
BrokerHeartbeatRequestData request,
long registerBrokerRecordOffset
) {
int brokerId = request.brokerId();
long brokerEpoch = request.brokerEpoch();
clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
@ -1419,6 +1421,23 @@ public class ReplicationControlManager { @@ -1419,6 +1421,23 @@ public class ReplicationControlManager {
return ControllerResult.of(records, reply);
}
/**
* Process a broker heartbeat which has been sitting on the queue for too long, and has
* expired. With default settings, this would happen after 1 second. We process expired
* heartbeats by updating the lastSeenNs of the broker, so that the broker won't get fenced
* incorrectly. However, we don't perform any state changes that we normally would, such as
* unfencing a fenced broker, etc.
*/
void processExpiredBrokerHeartbeat(BrokerHeartbeatRequestData request) {
int brokerId = request.brokerId();
clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
clusterControl.heartbeatManager().touch(brokerId,
clusterControl.brokerRegistrations().get(brokerId).fenced(),
request.currentMetadataOffset());
log.error("processExpiredBrokerHeartbeat: controller event queue overloaded. Timed out " +
"heartbeat from broker {}.", brokerId);
}
public ControllerResult<Void> unregisterBroker(int brokerId) {
BrokerRegistration registration = clusterControl.brokerRegistrations().get(brokerId);
if (registration == null) {

41
metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java

@ -0,0 +1,41 @@ @@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.errors;
import org.apache.kafka.common.errors.TimeoutException;
import java.util.concurrent.ExecutionException;
public class ControllerExceptions {
/**
* Check if an exception is a normal timeout exception.
*
* @param exception The exception to check.
* @return True if the exception is a timeout exception.
*/
public static boolean isTimeoutException(Throwable exception) {
if (exception == null) return false;
if (exception instanceof ExecutionException) {
exception = exception.getCause();
if (exception == null) return false;
}
if (!(exception instanceof TimeoutException)) return false;
return true;
}
}

13
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

@ -60,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -60,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private Consumer<Long> newHistogram(MetricName name, boolean biased) {
if (registry.isPresent()) {
@ -150,6 +151,18 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -150,6 +151,18 @@ public class QuorumControllerMetrics implements AutoCloseable {
return lastAppliedRecordTimestamp.get();
}
public void incrementTimedOutHeartbeats() {
timedOutHeartbeats.addAndGet(1);
}
public void setTimedOutHeartbeats(long heartbeats) {
timedOutHeartbeats.set(heartbeats);
}
public long timedOutHeartbeats() {
return timedOutHeartbeats.get();
}
@Override
public void close() {
registry.ifPresent(r -> Arrays.asList(

149
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

@ -72,6 +72,7 @@ import org.apache.kafka.common.utils.LogContext; @@ -72,6 +72,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatState;
import org.apache.kafka.controller.ReplicationControlManager.KRaftClusterDescriber;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
@ -154,9 +155,36 @@ public class ReplicationControlManagerTest { @@ -154,9 +155,36 @@ public class ReplicationControlManagerTest {
private final static int BROKER_SESSION_TIMEOUT_MS = 1000;
private static class ReplicationControlTestContext {
private static class Builder {
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private MetadataVersion metadataVersion = MetadataVersion.latest();
private MockTime mockTime = new MockTime();
Builder setCreateTopicPolicy(CreateTopicPolicy createTopicPolicy) {
this.createTopicPolicy = Optional.of(createTopicPolicy);
return this;
}
Builder setMetadataVersion(MetadataVersion metadataVersion) {
this.metadataVersion = metadataVersion;
return this;
}
Builder setMockTime(MockTime mockTime) {
this.mockTime = mockTime;
return this;
}
ReplicationControlTestContext build() {
return new ReplicationControlTestContext(metadataVersion,
createTopicPolicy,
mockTime);
}
}
final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
final LogContext logContext = new LogContext();
final MockTime time = new MockTime();
final MockTime time;
final MockRandom random = new MockRandom();
final FeatureControlManager featureControl;
final ClusterControlManager clusterControl;
@ -171,19 +199,12 @@ public class ReplicationControlManagerTest { @@ -171,19 +199,12 @@ public class ReplicationControlManagerTest {
RecordTestUtils.replayAll(replicationControl, records);
}
ReplicationControlTestContext() {
this(Optional.empty());
}
ReplicationControlTestContext(MetadataVersion metadataVersion) {
this(metadataVersion, Optional.empty());
}
ReplicationControlTestContext(Optional<CreateTopicPolicy> createTopicPolicy) {
this(MetadataVersion.latest(), createTopicPolicy);
}
ReplicationControlTestContext(MetadataVersion metadataVersion, Optional<CreateTopicPolicy> createTopicPolicy) {
private ReplicationControlTestContext(
MetadataVersion metadataVersion,
Optional<CreateTopicPolicy> createTopicPolicy,
MockTime time
) {
this.time = time;
this.featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
@ -469,7 +490,7 @@ public class ReplicationControlManagerTest { @@ -469,7 +490,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreateTopics() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
@ -525,7 +546,7 @@ public class ReplicationControlManagerTest { @@ -525,7 +546,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreateTopicsWithMutationQuotaExceeded() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
@ -545,7 +566,7 @@ public class ReplicationControlManagerTest { @@ -545,7 +566,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreateTopicsISRInvariants() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
@ -586,7 +607,7 @@ public class ReplicationControlManagerTest { @@ -586,7 +607,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreateTopicsWithConfigs() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
@ -692,7 +713,7 @@ public class ReplicationControlManagerTest { @@ -692,7 +713,7 @@ public class ReplicationControlManagerTest {
@ParameterizedTest(name = "testCreateTopicsWithValidateOnlyFlag with mutationQuotaExceeded: {0}")
@ValueSource(booleans = {true, false})
public void testCreateTopicsWithValidateOnlyFlag(boolean mutationQuotaExceeded) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
@ -714,7 +735,7 @@ public class ReplicationControlManagerTest { @@ -714,7 +735,7 @@ public class ReplicationControlManagerTest {
@Test
public void testInvalidCreateTopicsWithValidateOnlyFlag() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
@ -745,8 +766,9 @@ public class ReplicationControlManagerTest { @@ -745,8 +766,9 @@ public class ReplicationControlManagerTest {
Collections.singletonMap(SEGMENT_BYTES_CONFIG, "12300000")),
new CreateTopicPolicy.RequestMetadata("quux", null, null,
Collections.singletonMap(0, asList(2, 1, 0)), Collections.emptyMap())));
ReplicationControlTestContext ctx =
new ReplicationControlTestContext(Optional.of(createTopicPolicy));
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setCreateTopicPolicy(createTopicPolicy).
build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
ctx.createTestTopic("foo", 2, (short) 2, NONE.code());
@ -758,7 +780,7 @@ public class ReplicationControlManagerTest { @@ -758,7 +780,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreateTopicWithCollisionChars() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext(Optional.empty());
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
@ -808,7 +830,7 @@ public class ReplicationControlManagerTest { @@ -808,7 +830,7 @@ public class ReplicationControlManagerTest {
@Test
public void testRemoveLeaderships() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
@ -833,7 +855,7 @@ public class ReplicationControlManagerTest { @@ -833,7 +855,7 @@ public class ReplicationControlManagerTest {
@Test
public void testShrinkAndExpandIsr() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
@ -863,7 +885,7 @@ public class ReplicationControlManagerTest { @@ -863,7 +885,7 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionHandleUnknownTopicIdOrName(short version) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
@ -900,7 +922,7 @@ public class ReplicationControlManagerTest { @@ -900,7 +922,7 @@ public class ReplicationControlManagerTest {
@Test
public void testInvalidAlterPartitionRequests() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
@ -1073,7 +1095,7 @@ public class ReplicationControlManagerTest { @@ -1073,7 +1095,7 @@ public class ReplicationControlManagerTest {
@Test
public void testDeleteTopics() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs =
@ -1148,7 +1170,7 @@ public class ReplicationControlManagerTest { @@ -1148,7 +1170,7 @@ public class ReplicationControlManagerTest {
@Test
public void testDeleteTopicsWithMutationQuotaExceeded() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
@ -1175,7 +1197,7 @@ public class ReplicationControlManagerTest { @@ -1175,7 +1197,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreatePartitions() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
@ -1261,7 +1283,7 @@ public class ReplicationControlManagerTest { @@ -1261,7 +1283,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreatePartitionsWithMutationQuotaExceeded() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
@ -1300,7 +1322,7 @@ public class ReplicationControlManagerTest { @@ -1300,7 +1322,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
@ -1336,7 +1358,7 @@ public class ReplicationControlManagerTest { @@ -1336,7 +1358,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreatePartitionsISRInvariants() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
@ -1376,7 +1398,7 @@ public class ReplicationControlManagerTest { @@ -1376,7 +1398,7 @@ public class ReplicationControlManagerTest {
@Test
public void testValidateGoodManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(1, 2, 3);
ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1)),
OptionalInt.of(1));
@ -1390,7 +1412,7 @@ public class ReplicationControlManagerTest { @@ -1390,7 +1412,7 @@ public class ReplicationControlManagerTest {
@Test
public void testValidateBadManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(1, 2);
assertEquals("The manual partition assignment includes an empty replica list.",
assertThrows(InvalidReplicaAssignmentException.class, () ->
@ -1417,7 +1439,7 @@ public class ReplicationControlManagerTest { @@ -1417,7 +1439,7 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testReassignPartitions(short version) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
@ -1534,7 +1556,7 @@ public class ReplicationControlManagerTest { @@ -1534,7 +1556,7 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@ -1612,7 +1634,7 @@ public class ReplicationControlManagerTest { @@ -1612,7 +1634,7 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectBrokersWithStaleEpoch(short version) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@ -1681,7 +1703,7 @@ public class ReplicationControlManagerTest { @@ -1681,7 +1703,7 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectShuttingDownBrokers(short version) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@ -1736,7 +1758,7 @@ public class ReplicationControlManagerTest { @@ -1736,7 +1758,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCancelReassignPartitions() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@ -1855,7 +1877,7 @@ public class ReplicationControlManagerTest { @@ -1855,7 +1877,7 @@ public class ReplicationControlManagerTest {
@Test
public void testManualPartitionAssignmentOnAllFencedBrokers() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2, 3);
ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}},
INVALID_REPLICA_ASSIGNMENT.code());
@ -1863,7 +1885,7 @@ public class ReplicationControlManagerTest { @@ -1863,7 +1885,7 @@ public class ReplicationControlManagerTest {
@Test
public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2, 3, 4, 5);
ctx.unfenceBrokers(0, 1, 2);
Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId();
@ -1892,7 +1914,7 @@ public class ReplicationControlManagerTest { @@ -1892,7 +1914,7 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@ -1978,7 +2000,7 @@ public class ReplicationControlManagerTest { @@ -1978,7 +2000,7 @@ public class ReplicationControlManagerTest {
@Test
public void testPreferredElectionDoesNotTriggerUncleanElection() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(1, 2, 3, 4);
ctx.unfenceBrokers(1, 2, 3, 4);
@ -2031,7 +2053,7 @@ public class ReplicationControlManagerTest { @@ -2031,7 +2053,7 @@ public class ReplicationControlManagerTest {
@Test
public void testFenceMultipleBrokers() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
@ -2061,7 +2083,7 @@ public class ReplicationControlManagerTest { @@ -2061,7 +2083,7 @@ public class ReplicationControlManagerTest {
@Test
public void testElectPreferredLeaders() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(1, 2, 3, 4);
@ -2181,7 +2203,7 @@ public class ReplicationControlManagerTest { @@ -2181,7 +2203,7 @@ public class ReplicationControlManagerTest {
@Test
public void testBalancePartitionLeaders() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(2, 3, 4);
@ -2307,7 +2329,7 @@ public class ReplicationControlManagerTest { @@ -2307,7 +2329,7 @@ public class ReplicationControlManagerTest {
@Test
public void testKRaftClusterDescriber() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(2, 3, 4);
@ -2329,7 +2351,9 @@ public class ReplicationControlManagerTest { @@ -2329,7 +2351,9 @@ public class ReplicationControlManagerTest {
@ParameterizedTest
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", "IBP_3_3_IV3"})
public void testProcessBrokerHeartbeatInControlledShutdown(MetadataVersion metadataVersion) throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext(metadataVersion);
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setMetadataVersion(metadataVersion).
build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
@ -2367,9 +2391,34 @@ public class ReplicationControlManagerTest { @@ -2367,9 +2391,34 @@ public class ReplicationControlManagerTest {
assertEquals(expectedRecords, result.records());
}
@Test
public void testProcessExpiredBrokerHeartbeat() throws Exception {
MockTime mockTime = new MockTime(0, 0, 0);
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setMockTime(mockTime).
build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData().
setBrokerId(0).
setBrokerEpoch(100).
setCurrentMetadataOffset(123).
setWantShutDown(false);
mockTime.sleep(100);
ctx.replicationControl.processExpiredBrokerHeartbeat(heartbeatRequest);
Optional<BrokerHeartbeatState> state =
ctx.clusterControl.heartbeatManager().brokers().stream().
filter(broker -> broker.id() == 0).findFirst();
assertTrue(state.isPresent());
assertEquals(0, state.get().id());
assertEquals(100000000L, state.get().lastContactNs);
assertEquals(123, state.get().metadataOffset);
}
@Test
public void testReassignPartitionsHandlesNewReassignmentThatRemovesPreviouslyAddingReplicas() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4, 5);
ctx.unfenceBrokers(0, 1, 2, 3, 4, 5);

65
metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java

@ -0,0 +1,65 @@ @@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller.errors;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.controller.errors.ControllerExceptions.isTimeoutException;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class ControllerExceptionsTest {
@Test
public void testTimeoutExceptionIsTimeoutException() {
assertTrue(isTimeoutException(new TimeoutException()));
}
@Test
public void testWrappedTimeoutExceptionIsTimeoutException() {
assertTrue(isTimeoutException(
new ExecutionException("execution exception",
new TimeoutException())));
}
@Test
public void testRuntimeExceptionIsNotTimeoutException() {
assertFalse(isTimeoutException(new RuntimeException()));
}
@Test
public void testWrappedRuntimeExceptionIsNotTimeoutException() {
assertFalse(isTimeoutException(new ExecutionException(new RuntimeException())));
}
@Test
public void testTopicExistsExceptionIsNotTimeoutException() {
assertFalse(isTimeoutException(new TopicExistsException("Topic exists.")));
}
@Test
public void testExecutionExceptionWithNullCauseIsNotTimeoutException() {
assertFalse(isTimeoutException(new ExecutionException(null)));
}
}
Loading…
Cancel
Save