Browse Source

KAFKA-15582: Identify clean shutdown broker (#14465)

The PR includes:

* Added a new class of CleanShutdownFile which helps write and read from a clean shutdown file.
* Updated the BrokerRegistration API.
* Client side handling for the broker epoch.
* Minimum work on the controller side.

Reviewers: Jun Rao <junrao@gmail.com>
pull/14502/merge
Calvin Liu 1 year ago committed by GitHub
parent
commit
14029e2ddd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      checkstyle/import-control-core.xml
  2. 8
      clients/src/main/resources/common/message/BrokerRegistrationRequest.json
  3. 4
      clients/src/main/resources/common/message/BrokerRegistrationResponse.json
  4. 113
      core/src/main/java/kafka/log/CleanShutdownFileHandler.java
  5. 13
      core/src/main/scala/kafka/log/LogLoader.scala
  6. 36
      core/src/main/scala/kafka/log/LogManager.scala
  7. 16
      core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
  8. 5
      core/src/main/scala/kafka/server/BrokerServer.scala
  9. 3
      core/src/main/scala/kafka/server/KafkaServer.scala
  10. 57
      core/src/test/java/kafka/log/CleanShutdownFileHandlerTest.java
  11. 12
      core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
  12. 39
      core/src/test/scala/unit/kafka/log/LogManagerTest.scala
  13. 19
      core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
  14. 2
      core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
  15. 9
      core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
  16. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
  17. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
  18. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
  19. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
  20. 7
      metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
  21. 2
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  22. 12
      metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java

7
checkstyle/import-control-core.xml

@ -73,7 +73,10 @@
<allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.clients" />
</subpackage> </subpackage>
<subpackage name="log.remote"> <subpackage name="log">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="remote">
<allow pkg="org.apache.kafka.server.common" /> <allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.log.remote" /> <allow pkg="org.apache.kafka.server.log.remote" />
<allow pkg="org.apache.kafka.server.metrics" /> <allow pkg="org.apache.kafka.server.metrics" />
@ -82,7 +85,7 @@
<allow pkg="kafka.cluster" /> <allow pkg="kafka.cluster" />
<allow pkg="kafka.server" /> <allow pkg="kafka.server" />
<allow pkg="org.mockito" /> <allow pkg="org.mockito" />
<allow pkg="org.apache.kafka.test" /> </subpackage>
</subpackage> </subpackage>
<subpackage name="server"> <subpackage name="server">

8
clients/src/main/resources/common/message/BrokerRegistrationRequest.json

@ -14,12 +14,14 @@
// limitations under the License. // limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode. // Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
//
// Version 2 adds the PreviousBrokerEpoch for the KIP-966
{ {
"apiKey":62, "apiKey":62,
"type": "request", "type": "request",
"listeners": ["controller"], "listeners": ["controller"],
"name": "BrokerRegistrationRequest", "name": "BrokerRegistrationRequest",
"validVersions": "0-1", "validVersions": "0-2",
"flexibleVersions": "0+", "flexibleVersions": "0+",
"fields": [ "fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@ -53,6 +55,8 @@
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The rack which this broker is in." }, "about": "The rack which this broker is in." },
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false", { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
"about": "If the required configurations for ZK migration are present, this value is set to true" } "about": "If the required configurations for ZK migration are present, this value is set to true" },
{ "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
"about": "The epoch before a clean shutdown." }
] ]
} }

4
clients/src/main/resources/common/message/BrokerRegistrationResponse.json

@ -14,11 +14,13 @@
// limitations under the License. // limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode. // Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
//
// Version 2 adds the PreviousBrokerEpoch to the request for the KIP-966
{ {
"apiKey": 62, "apiKey": 62,
"type": "response", "type": "response",
"name": "BrokerRegistrationResponse", "name": "BrokerRegistrationResponse",
"validVersions": "0-1", "validVersions": "0-2",
"flexibleVersions": "0+", "flexibleVersions": "0+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

113
core/src/main/java/kafka/log/CleanShutdownFileHandler.java

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
* Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
* This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
* avoided by passing in the recovery point, however finding the correct position to do this
* requires accessing the offset index which may not be safe in an unclean shutdown.
* For more information see the discussion in PR#2104
*
* Also, the clean shutdown file can also store the broker epoch, this can be used in the broker registration to
* demonstrate the last reboot is a clean shutdown. (KIP-966)
*/
public class CleanShutdownFileHandler {
public static final String CLEAN_SHUTDOWN_FILE_NAME = ".kafka_cleanshutdown";
private final File cleanShutdownFile;
private static final int CURRENT_VERSION = 0;
private final Logger logger;
private enum Fields {
VERSION,
BROKER_EPOCH;
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
public CleanShutdownFileHandler(String dirPath) {
logger = new LogContext().logger(CleanShutdownFileHandler.class);
this.cleanShutdownFile = new File(dirPath, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME);
}
public void write(long brokerEpoch) throws Exception {
write(brokerEpoch, CURRENT_VERSION);
}
// visible to test.
void write(long brokerEpoch, int version) throws Exception {
FileOutputStream os = new FileOutputStream(cleanShutdownFile);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8));
try {
Map<String, String> payload = new HashMap<>();
payload.put(Fields.VERSION.toString(), Integer.toString(version));
payload.put(Fields.BROKER_EPOCH.toString(), Long.toString(brokerEpoch));
bw.write(new ObjectMapper().writeValueAsString(payload));
bw.flush();
os.getFD().sync();
} finally {
bw.close();
os.close();
}
}
public long read() {
long brokerEpoch = -1L;
try {
String text = Utils.readFileAsString(cleanShutdownFile.toPath().toString());
Map<String, String> content = new ObjectMapper().readValue(text, HashMap.class);
brokerEpoch = Long.parseLong(content.getOrDefault(Fields.BROKER_EPOCH.toString(), "-1L"));
} catch (Exception e) {
logger.warn("Fail to read the clean shutdown file in " + cleanShutdownFile.toPath() + ":" + e);
}
return brokerEpoch;
}
public void delete() throws Exception {
Files.deleteIfExists(cleanShutdownFile.toPath());
}
public boolean exists() {
return cleanShutdownFile.exists();
}
@Override
public String toString() {
return "CleanShutdownFile=(" + "file=" + cleanShutdownFile.toString() + ')';
}
}

13
core/src/main/scala/kafka/log/LogLoader.scala

@ -35,19 +35,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.{Set, mutable} import scala.collection.{Set, mutable}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
object LogLoader extends Logging {
/**
* Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
* This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
* avoided by passing in the recovery point, however finding the correct position to do this
* requires accessing the offset index which may not be safe in an unclean shutdown.
* For more information see the discussion in PR#2104
*/
val CleanShutdownFile = ".kafka_cleanshutdown"
}
/** /**
* @param dir The directory from which log segments need to be loaded * @param dir The directory from which log segments need to be loaded
* @param topicPartition The topic partition associated with the log being loaded * @param topicPartition The topic partition associated with the log being loaded

36
core/src/main/scala/kafka/log/LogManager.scala

@ -408,11 +408,11 @@ class LogManager(logDirs: Seq[File],
new LogRecoveryThreadFactory(logDirAbsolutePath)) new LogRecoveryThreadFactory(logDirAbsolutePath))
threadPools.append(pool) threadPools.append(pool)
val cleanShutdownFile = new File(dir, LogLoader.CleanShutdownFile) val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
if (cleanShutdownFile.exists) { if (cleanShutdownFileHandler.exists()) {
// Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
// so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471 // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
Files.deleteIfExists(cleanShutdownFile.toPath) cleanShutdownFileHandler.delete()
hadCleanShutdown = true hadCleanShutdown = true
} }
hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown) hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)
@ -625,7 +625,7 @@ class LogManager(logDirs: Seq[File],
/** /**
* Close all the logs * Close all the logs
*/ */
def shutdown(): Unit = { def shutdown(brokerEpoch: Long = -1): Unit = {
info("Shutting down.") info("Shutting down.")
metricsGroup.removeMetric("OfflineLogDirectoryCount") metricsGroup.removeMetric("OfflineLogDirectoryCount")
@ -684,8 +684,9 @@ class LogManager(logDirs: Seq[File],
val logDirAbsolutePath = dir.getAbsolutePath val logDirAbsolutePath = dir.getAbsolutePath
if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) || if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) ||
loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) { loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
debug(s"Writing clean shutdown marker at $dir") val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this) debug(s"Writing clean shutdown marker at $dir with broker epoch=$brokerEpoch")
CoreUtils.swallow(cleanShutdownFileHandler.write(brokerEpoch), this)
} }
} }
} }
@ -1409,6 +1410,29 @@ class LogManager(logDirs: Seq[File],
None None
} }
} }
def readBrokerEpochFromCleanShutdownFiles(): Long = {
// Verify whether all the log dirs have the same broker epoch in their clean shutdown files. If there is any dir not
// live, fail the broker epoch check.
if (liveLogDirs.size < logDirs.size) {
return -1L
}
var brokerEpoch = -1L
for (dir <- liveLogDirs) {
val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
val currentBrokerEpoch = cleanShutdownFileHandler.read
if (currentBrokerEpoch == -1L) {
info(s"Unable to read the broker epoch in ${dir.toString}.")
return -1L
}
if (brokerEpoch != -1 && currentBrokerEpoch != brokerEpoch) {
info(s"Found different broker epochs in ${dir.toString}. Other=$brokerEpoch vs current=$currentBrokerEpoch.")
return -1L
}
brokerEpoch = currentBrokerEpoch
}
brokerEpoch
}
} }
object LogManager { object LogManager {

16
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala

@ -187,6 +187,11 @@ class BrokerLifecycleManager(
*/ */
private var _channelManager: NodeToControllerChannelManager = _ private var _channelManager: NodeToControllerChannelManager = _
/**
* The broker epoch from the previous run, or -1 if the epoch is not able to be found.
*/
@volatile private var previousBrokerEpoch: Long = -1L
/** /**
* The event queue. * The event queue.
*/ */
@ -201,12 +206,18 @@ class BrokerLifecycleManager(
* @param highestMetadataOffsetProvider Provides the current highest metadata offset. * @param highestMetadataOffsetProvider Provides the current highest metadata offset.
* @param channelManager The NodeToControllerChannelManager to use. * @param channelManager The NodeToControllerChannelManager to use.
* @param clusterId The cluster ID. * @param clusterId The cluster ID.
* @param advertisedListeners The advertised listeners for this broker.
* @param supportedFeatures The features for this broker.
* @param previousBrokerEpoch The broker epoch before the reboot.
*
*/ */
def start(highestMetadataOffsetProvider: () => Long, def start(highestMetadataOffsetProvider: () => Long,
channelManager: NodeToControllerChannelManager, channelManager: NodeToControllerChannelManager,
clusterId: String, clusterId: String,
advertisedListeners: ListenerCollection, advertisedListeners: ListenerCollection,
supportedFeatures: util.Map[String, VersionRange]): Unit = { supportedFeatures: util.Map[String, VersionRange],
previousBrokerEpoch: Long): Unit = {
this.previousBrokerEpoch = previousBrokerEpoch
eventQueue.append(new StartupEvent(highestMetadataOffsetProvider, eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
channelManager, clusterId, advertisedListeners, supportedFeatures)) channelManager, clusterId, advertisedListeners, supportedFeatures))
} }
@ -310,7 +321,8 @@ class BrokerLifecycleManager(
setFeatures(features). setFeatures(features).
setIncarnationId(incarnationId). setIncarnationId(incarnationId).
setListeners(_advertisedListeners). setListeners(_advertisedListeners).
setRack(rack.orNull) setRack(rack.orNull).
setPreviousBrokerEpoch(previousBrokerEpoch)
if (isDebugEnabled) { if (isDebugEnabled) {
debug(s"Sending broker registration $data") debug(s"Sending broker registration $data")
} }

5
core/src/main/scala/kafka/server/BrokerServer.scala

@ -335,7 +335,8 @@ class BrokerServer(
brokerLifecycleChannelManager, brokerLifecycleChannelManager,
sharedServer.metaProps.clusterId, sharedServer.metaProps.clusterId,
listenerInfo.toBrokerRegistrationRequest, listenerInfo.toBrokerRegistrationRequest,
featuresRemapped featuresRemapped,
logManager.readBrokerEpochFromCleanShutdownFiles()
) )
// If the BrokerLifecycleManager's initial catch-up future fails, it means we timed out // If the BrokerLifecycleManager's initial catch-up future fails, it means we timed out
// or are shutting down before we could catch up. Therefore, also fail the firstPublishFuture. // or are shutting down before we could catch up. Therefore, also fail the firstPublishFuture.
@ -645,7 +646,7 @@ class BrokerServer(
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
if (logManager != null) if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this) CoreUtils.swallow(logManager.shutdown(lifecycleManager.brokerEpoch), this)
// Close remote log manager to give a chance to any of its underlying clients // Close remote log manager to give a chance to any of its underlying clients
// (especially in RemoteStorageManager and RemoteLogMetadataManager) to close gracefully. // (especially in RemoteStorageManager and RemoteLogMetadataManager) to close gracefully.

3
core/src/main/scala/kafka/server/KafkaServer.scala

@ -437,7 +437,8 @@ class KafkaServer(
brokerToQuorumChannelManager, brokerToQuorumChannelManager,
kraftMetaProps.clusterId, kraftMetaProps.clusterId,
networkListeners, networkListeners,
ibpAsFeature ibpAsFeature,
-1
) )
logger.debug("Start RaftManager") logger.debug("Start RaftManager")
} }

57
core/src/test/java/kafka/log/CleanShutdownFileHandlerTest.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.nio.file.Files;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 10)
class CleanShutdownFileHandlerTest {
@Test
public void testCleanShutdownFileBasic() {
File logDir;
logDir = assertDoesNotThrow(() -> Files.createTempDirectory("kafka-cleanShutdownFile").toFile());
CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir.getPath());
assertDoesNotThrow(() -> cleanShutdownFileHandler.write(10L));
assertTrue(cleanShutdownFileHandler.exists());
assertEquals(10L, cleanShutdownFileHandler.read());
assertDoesNotThrow(() -> cleanShutdownFileHandler.delete());
assertFalse(cleanShutdownFileHandler.exists());
}
@Test
public void testCleanShutdownFileNonExist() {
File logDir;
logDir = assertDoesNotThrow(() -> Files.createTempDirectory("kafka-cleanShutdownFile").toFile());
CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir.getPath());
assertDoesNotThrow(() -> cleanShutdownFileHandler.write(10L, 0));
assertTrue(cleanShutdownFileHandler.exists());
assertDoesNotThrow(() -> cleanShutdownFileHandler.delete());
assertFalse(cleanShutdownFileHandler.exists());
assertEquals(-1L, cleanShutdownFileHandler.read());
}
}

12
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala

@ -181,25 +181,25 @@ class LogLoaderTest {
(logManager, runLoadLogs) (logManager, runLoadLogs)
} }
val cleanShutdownFile = new File(logDir, LogLoader.CleanShutdownFile) val cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir.getPath)
locally { locally {
val (logManager, _) = initializeLogManagerForSimulatingErrorTest() val (logManager, _) = initializeLogManagerForSimulatingErrorTest()
// Load logs after a clean shutdown // Load logs after a clean shutdown
Files.createFile(cleanShutdownFile.toPath) cleanShutdownFileHandler.write(0L)
cleanShutdownInterceptedValue = false cleanShutdownInterceptedValue = false
var defaultConfig = logManager.currentDefaultConfig var defaultConfig = logManager.currentDefaultConfig
logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
assertTrue(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag") assertTrue(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag")
assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not exist after loadLogs has completed") assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not exist after loadLogs has completed")
// Load logs without clean shutdown file // Load logs without clean shutdown file
cleanShutdownInterceptedValue = true cleanShutdownInterceptedValue = true
defaultConfig = logManager.currentDefaultConfig defaultConfig = logManager.currentDefaultConfig
logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
assertFalse(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag") assertFalse(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag")
assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not exist after loadLogs has completed") assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not exist after loadLogs has completed")
// Create clean shutdown file and then simulate error while loading logs such that log loading does not complete. // Create clean shutdown file and then simulate error while loading logs such that log loading does not complete.
Files.createFile(cleanShutdownFile.toPath) cleanShutdownFileHandler.write(0L)
logManager.shutdown() logManager.shutdown()
} }
@ -210,7 +210,7 @@ class LogLoaderTest {
simulateError.hasError = true simulateError.hasError = true
simulateError.errorType = ErrorTypes.RuntimeException simulateError.errorType = ErrorTypes.RuntimeException
assertThrows(classOf[RuntimeException], runLoadLogs) assertThrows(classOf[RuntimeException], runLoadLogs)
assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed") assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not have existed")
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown") assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown")
// Simulate Kafka storage error with IOException cause // Simulate Kafka storage error with IOException cause

39
core/src/test/scala/unit/kafka/log/LogManagerTest.scala

@ -127,10 +127,39 @@ class LogManagerTest {
// This should cause log1.close() to fail during LogManger shutdown sequence. // This should cause log1.close() to fail during LogManger shutdown sequence.
FileUtils.deleteDirectory(logFile1) FileUtils.deleteDirectory(logFile1)
logManagerForTest.get.shutdown() logManagerForTest.get.shutdown(3)
assertFalse(Files.exists(new File(logDir1, LogLoader.CleanShutdownFile).toPath)) assertFalse(Files.exists(new File(logDir1, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
assertTrue(Files.exists(new File(logDir2, LogLoader.CleanShutdownFile).toPath)) assertTrue(Files.exists(new File(logDir2, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
assertEquals(-1L, logManagerForTest.get.readBrokerEpochFromCleanShutdownFiles())
} finally {
logManagerForTest.foreach(manager => manager.liveLogDirs.foreach(Utils.delete))
}
}
@Test
def testCleanShutdownFileWithBrokerEpoch(): Unit = {
val logDir1 = TestUtils.tempDir()
val logDir2 = TestUtils.tempDir()
var logManagerForTest: Option[LogManager] = Option.empty
try {
logManagerForTest = Some(createLogManager(Seq(logDir1, logDir2)))
assertEquals(2, logManagerForTest.get.liveLogDirs.size)
logManagerForTest.get.startup(Set.empty)
logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 1), topicId = None)
val logFile1 = new File(logDir1, name + "-0")
assertTrue(logFile1.exists)
val logFile2 = new File(logDir2, name + "-1")
assertTrue(logFile2.exists)
logManagerForTest.get.shutdown(3)
assertTrue(Files.exists(new File(logDir1, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
assertTrue(Files.exists(new File(logDir2, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
assertEquals(3L, logManagerForTest.get.readBrokerEpochFromCleanShutdownFiles())
} finally { } finally {
logManagerForTest.foreach(manager => manager.liveLogDirs.foreach(Utils.delete)) logManagerForTest.foreach(manager => manager.liveLogDirs.foreach(Utils.delete))
} }
@ -160,7 +189,7 @@ class LogManagerTest {
// 2. simulate unclean shutdown by deleting clean shutdown marker file // 2. simulate unclean shutdown by deleting clean shutdown marker file
logManager.shutdown() logManager.shutdown()
assertTrue(Files.deleteIfExists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) assertTrue(Files.deleteIfExists(new File(logDir, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
// 3. create a new LogManager and start it in a different thread // 3. create a new LogManager and start it in a different thread
@volatile var loadLogCalled = 0 @volatile var loadLogCalled = 0
@ -186,7 +215,7 @@ class LogManagerTest {
logManager = null logManager = null
// 5. verify that CleanShutdownFile is not created under logDir // 5. verify that CleanShutdownFile is not created under logDir
assertFalse(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath)) assertFalse(Files.exists(new File(logDir, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
} }
/** /**

19
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala

@ -22,7 +22,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.Node import org.apache.kafka.common.Node
import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, BrokerRegistrationResponseData} import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, BrokerRegistrationResponseData}
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationResponse} import org.apache.kafka.common.requests.{AbstractRequest, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.metadata.BrokerState import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{Test, Timeout} import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -55,7 +55,7 @@ class BrokerLifecycleManagerTest {
assertEquals(BrokerState.NOT_RUNNING, manager.state) assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(), manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners, context.mockChannelManager, context.clusterId, context.advertisedListeners,
Collections.emptyMap()) Collections.emptyMap(), -1)
TestUtils.retry(60000) { TestUtils.retry(60000) {
assertEquals(BrokerState.STARTING, manager.state) assertEquals(BrokerState.STARTING, manager.state)
} }
@ -69,17 +69,20 @@ class BrokerLifecycleManagerTest {
val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false) val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false)
val controllerNode = new Node(3000, "localhost", 8021) val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode) context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode)
manager.start(() => context.highestMetadataOffset.get(), manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners, context.mockChannelManager, context.clusterId, context.advertisedListeners,
Collections.emptyMap()) Collections.emptyMap(), 10L)
TestUtils.retry(60000) {
assertEquals(1, context.mockChannelManager.unsentQueue.size)
assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
}
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode)
TestUtils.retry(10000) { TestUtils.retry(10000) {
context.poll() context.poll()
assertEquals(1000L, manager.brokerEpoch) assertEquals(1000L, manager.brokerEpoch)
} }
manager.close() manager.close()
} }
@Test @Test
@ -98,7 +101,7 @@ class BrokerLifecycleManagerTest {
assertEquals(1, context.mockClient.futureResponses().size) assertEquals(1, context.mockClient.futureResponses().size)
manager.start(() => context.highestMetadataOffset.get(), manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners, context.mockChannelManager, context.clusterId, context.advertisedListeners,
Collections.emptyMap()) Collections.emptyMap(), -1)
// We should send the first registration request and get a failure immediately // We should send the first registration request and get a failure immediately
TestUtils.retry(60000) { TestUtils.retry(60000) {
context.poll() context.poll()
@ -136,7 +139,7 @@ class BrokerLifecycleManagerTest {
new BrokerHeartbeatResponseData().setIsCaughtUp(true)), controllerNode) new BrokerHeartbeatResponseData().setIsCaughtUp(true)), controllerNode)
manager.start(() => context.highestMetadataOffset.get(), manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners, context.mockChannelManager, context.clusterId, context.advertisedListeners,
Collections.emptyMap()) Collections.emptyMap(), -1)
TestUtils.retry(10000) { TestUtils.retry(10000) {
context.poll() context.poll()
manager.eventQueue.wakeup() manager.eventQueue.wakeup()

2
core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala

@ -29,7 +29,7 @@ class MockNodeToControllerChannelManager(
val retryTimeoutMs: Int = 60000, val retryTimeoutMs: Int = 60000,
val requestTimeoutMs: Int = 30000 val requestTimeoutMs: Int = 30000
) extends NodeToControllerChannelManager { ) extends NodeToControllerChannelManager {
private val unsentQueue = new java.util.ArrayDeque[NodeToControllerQueueItem]() val unsentQueue = new java.util.ArrayDeque[NodeToControllerQueueItem]()
client.setNodeApiVersions(controllerApiVersions) client.setNodeApiVersions(controllerApiVersions)

9
core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala

@ -17,7 +17,8 @@
package kafka.server.epoch package kafka.server.epoch
import kafka.log.{LogLoader, UnifiedLog} import kafka.log.CleanShutdownFileHandler
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig._ import kafka.server.KafkaConfig._
import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness} import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
import kafka.tools.DumpLogSegments import kafka.tools.DumpLogSegments
@ -36,7 +37,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.io.{File, RandomAccessFile} import java.io.{File, RandomAccessFile}
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import scala.collection.Seq
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
/** /**
@ -48,7 +48,6 @@ import scala.jdk.CollectionConverters._
* A test which validates the end to end workflow is also included. * A test which validates the end to end workflow is also included.
*/ */
class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging { class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging {
// Set this to IBP_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case // Set this to IBP_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case
override def metadataVersion = MetadataVersion.latest override def metadataVersion = MetadataVersion.latest
val topic = "topic1" val topic = "topic1"
@ -152,7 +151,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
broker100.shutdown() broker100.shutdown()
//Delete the clean shutdown file to simulate crash //Delete the clean shutdown file to simulate crash
new File(broker100.config.logDirs.head, LogLoader.CleanShutdownFile).delete() new File(broker100.config.logDirs.head, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).delete()
//Delete 5 messages from the leader's log on 100 //Delete 5 messages from the leader's log on 100
deleteMessagesFromLogFile(5 * msg.length, broker100, 0) deleteMessagesFromLogFile(5 * msg.length, broker100, 0)
@ -199,7 +198,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
brokers.foreach { b => b.shutdown() } brokers.foreach { b => b.shutdown() }
//Delete the clean shutdown file to simulate crash //Delete the clean shutdown file to simulate crash
new File(brokers(0).config.logDirs(0), LogLoader.CleanShutdownFile).delete() new File(brokers(0).config.logDirs(0), CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).delete()
//Delete half the messages from the log file //Delete half the messages from the log file
deleteMessagesFromLogFile(getLogFile(brokers(0), 0).length() / 2, brokers(0), 0) deleteMessagesFromLogFile(getLogFile(brokers(0), 0).length() / 2, brokers(0), 0)

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java

@ -262,7 +262,7 @@ public class ReplicaFetcherThreadBenchmark {
public void tearDown() throws IOException, InterruptedException { public void tearDown() throws IOException, InterruptedException {
metrics.close(); metrics.close();
replicaManager.shutdown(false); replicaManager.shutdown(false);
logManager.shutdown(); logManager.shutdown(-1L);
scheduler.shutdown(); scheduler.shutdown();
Utils.delete(logDir); Utils.delete(logDir);
} }

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java

@ -143,7 +143,7 @@ public class PartitionMakeFollowerBenchmark {
@TearDown(Level.Trial) @TearDown(Level.Trial)
public void tearDown() throws IOException, InterruptedException { public void tearDown() throws IOException, InterruptedException {
executorService.shutdownNow(); executorService.shutdownNow();
logManager.shutdown(); logManager.shutdown(-1L);
scheduler.shutdown(); scheduler.shutdown();
Utils.delete(logDir); Utils.delete(logDir);
} }

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java

@ -147,7 +147,7 @@ public class UpdateFollowerFetchStateBenchmark {
@TearDown(Level.Trial) @TearDown(Level.Trial)
public void tearDown() throws InterruptedException { public void tearDown() throws InterruptedException {
logManager.shutdown(); logManager.shutdown(-1L);
scheduler.shutdown(); scheduler.shutdown();
} }

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java

@ -173,7 +173,7 @@ public class PartitionCreationBench {
@TearDown(Level.Invocation) @TearDown(Level.Invocation)
public void tearDown() throws Exception { public void tearDown() throws Exception {
this.replicaManager.shutdown(false); this.replicaManager.shutdown(false);
logManager.shutdown(); logManager.shutdown(-1L);
this.metrics.close(); this.metrics.close();
this.scheduler.shutdown(); this.scheduler.shutdown();
this.quotaManagers.shutdown(); this.quotaManagers.shutdown();

7
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

@ -317,7 +317,8 @@ public class ClusterControlManager {
public ControllerResult<BrokerRegistrationReply> registerBroker( public ControllerResult<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request, BrokerRegistrationRequestData request,
long brokerEpoch, long brokerEpoch,
FinalizedControllerFeatures finalizedFeatures) { FinalizedControllerFeatures finalizedFeatures,
short version) {
if (heartbeatManager == null) { if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active."); throw new RuntimeException("ClusterControlManager is not active.");
} }
@ -327,6 +328,10 @@ public class ClusterControlManager {
} }
int brokerId = request.brokerId(); int brokerId = request.brokerId();
BrokerRegistration existing = brokerRegistrations.get(brokerId); BrokerRegistration existing = brokerRegistrations.get(brokerId);
if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) {
// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown.
log.debug("Received an unclean shutdown request");
}
if (existing != null) { if (existing != null) {
if (heartbeatManager.hasValidSession(brokerId)) { if (heartbeatManager.hasValidSession(brokerId)) {
if (!existing.incarnationId().equals(request.incarnationId())) { if (!existing.incarnationId().equals(request.incarnationId())) {

2
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -2157,7 +2157,7 @@ public final class QuorumController implements Controller {
() -> { () -> {
ControllerResult<BrokerRegistrationReply> result = clusterControl. ControllerResult<BrokerRegistrationReply> result = clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(), featureControl. registerBroker(request, offsetControl.nextWriteOffset(), featureControl.
finalizedFeatures(Long.MAX_VALUE)); finalizedFeatures(Long.MAX_VALUE), context.requestHeader().requestApiVersion());
rescheduleMaybeFenceStaleBrokers(); rescheduleMaybeFenceStaleBrokers();
return result; return result;
}, },

12
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java

@ -264,7 +264,8 @@ public class ClusterControlManagerTest {
setRack(null). setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L, 123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L))); new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
(short) 1));
} }
@ParameterizedTest @ParameterizedTest
@ -294,7 +295,8 @@ public class ClusterControlManagerTest {
setRack(null). setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L, 123L,
new FinalizedControllerFeatures(Collections.emptyMap(), 456L)); new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
(short) 1);
short expectedVersion = metadataVersion.registerBrokerRecordVersion(); short expectedVersion = metadataVersion.registerBrokerRecordVersion();
@ -517,7 +519,8 @@ public class ClusterControlManagerTest {
setRack(null). setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L, 123L,
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage()); featureControl.finalizedFeatures(Long.MAX_VALUE),
(short) 1)).getMessage());
assertEquals("Unable to register because the broker does not support version 4 of " + assertEquals("Unable to register because the broker does not support version 4 of " +
"metadata.version. It wants a version between 7 and 7, inclusive.", "metadata.version. It wants a version between 7 and 7, inclusive.",
@ -534,7 +537,8 @@ public class ClusterControlManagerTest {
setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())). setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L, 123L,
featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage()); featureControl.finalizedFeatures(Long.MAX_VALUE),
(short) 1)).getMessage());
} }
@Test @Test

Loading…
Cancel
Save