diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 849c45e5b19..983f077adc1 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -73,16 +73,19 @@
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 98658a3f04a..6c51eac301c 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -14,12 +14,14 @@
// limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
+//
+// Version 2 adds the PreviousBrokerEpoch for the KIP-966
{
"apiKey":62,
"type": "request",
"listeners": ["controller"],
"name": "BrokerRegistrationRequest",
- "validVersions": "0-1",
+ "validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@@ -53,6 +55,8 @@
{ "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The rack which this broker is in." },
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
- "about": "If the required configurations for ZK migration are present, this value is set to true" }
+ "about": "If the required configurations for ZK migration are present, this value is set to true" },
+ { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
+ "about": "The epoch before a clean shutdown." }
]
}
diff --git a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
index 7d45b0fecab..3b483abfc05 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
@@ -14,11 +14,13 @@
// limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
+//
+// Version 2 adds the PreviousBrokerEpoch to the request for the KIP-966
{
"apiKey": 62,
"type": "response",
"name": "BrokerRegistrationResponse",
- "validVersions": "0-1",
+ "validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git a/core/src/main/java/kafka/log/CleanShutdownFileHandler.java b/core/src/main/java/kafka/log/CleanShutdownFileHandler.java
new file mode 100644
index 00000000000..a14cc6a1b66
--- /dev/null
+++ b/core/src/main/java/kafka/log/CleanShutdownFileHandler.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
+ * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
+ * avoided by passing in the recovery point, however finding the correct position to do this
+ * requires accessing the offset index which may not be safe in an unclean shutdown.
+ * For more information see the discussion in PR#2104
+ *
+ * Also, the clean shutdown file can also store the broker epoch, this can be used in the broker registration to
+ * demonstrate the last reboot is a clean shutdown. (KIP-966)
+ */
+
+public class CleanShutdownFileHandler {
+ public static final String CLEAN_SHUTDOWN_FILE_NAME = ".kafka_cleanshutdown";
+ private final File cleanShutdownFile;
+ private static final int CURRENT_VERSION = 0;
+ private final Logger logger;
+
+ private enum Fields {
+ VERSION,
+ BROKER_EPOCH;
+
+ @Override
+ public String toString() {
+ return name().toLowerCase(Locale.ROOT);
+ }
+ }
+
+ public CleanShutdownFileHandler(String dirPath) {
+ logger = new LogContext().logger(CleanShutdownFileHandler.class);
+ this.cleanShutdownFile = new File(dirPath, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME);
+ }
+
+ public void write(long brokerEpoch) throws Exception {
+ write(brokerEpoch, CURRENT_VERSION);
+ }
+
+ // visible to test.
+ void write(long brokerEpoch, int version) throws Exception {
+ FileOutputStream os = new FileOutputStream(cleanShutdownFile);
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os, StandardCharsets.UTF_8));
+ try {
+ Map payload = new HashMap<>();
+ payload.put(Fields.VERSION.toString(), Integer.toString(version));
+ payload.put(Fields.BROKER_EPOCH.toString(), Long.toString(brokerEpoch));
+ bw.write(new ObjectMapper().writeValueAsString(payload));
+ bw.flush();
+ os.getFD().sync();
+ } finally {
+ bw.close();
+ os.close();
+ }
+ }
+
+ public long read() {
+ long brokerEpoch = -1L;
+ try {
+ String text = Utils.readFileAsString(cleanShutdownFile.toPath().toString());
+ Map content = new ObjectMapper().readValue(text, HashMap.class);
+
+ brokerEpoch = Long.parseLong(content.getOrDefault(Fields.BROKER_EPOCH.toString(), "-1L"));
+ } catch (Exception e) {
+ logger.warn("Fail to read the clean shutdown file in " + cleanShutdownFile.toPath() + ":" + e);
+ }
+ return brokerEpoch;
+ }
+
+ public void delete() throws Exception {
+ Files.deleteIfExists(cleanShutdownFile.toPath());
+ }
+
+ public boolean exists() {
+ return cleanShutdownFile.exists();
+ }
+
+ @Override
+ public String toString() {
+ return "CleanShutdownFile=(" + "file=" + cleanShutdownFile.toString() + ')';
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index ebfb3e66949..1616be58b82 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -35,19 +35,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.{Set, mutable}
import scala.jdk.CollectionConverters._
-object LogLoader extends Logging {
-
- /**
- * Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
- * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
- * avoided by passing in the recovery point, however finding the correct position to do this
- * requires accessing the offset index which may not be safe in an unclean shutdown.
- * For more information see the discussion in PR#2104
- */
- val CleanShutdownFile = ".kafka_cleanshutdown"
-}
-
-
/**
* @param dir The directory from which log segments need to be loaded
* @param topicPartition The topic partition associated with the log being loaded
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index e420b0df435..a1f4e582355 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -408,11 +408,11 @@ class LogManager(logDirs: Seq[File],
new LogRecoveryThreadFactory(logDirAbsolutePath))
threadPools.append(pool)
- val cleanShutdownFile = new File(dir, LogLoader.CleanShutdownFile)
- if (cleanShutdownFile.exists) {
+ val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
+ if (cleanShutdownFileHandler.exists()) {
// Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
// so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
- Files.deleteIfExists(cleanShutdownFile.toPath)
+ cleanShutdownFileHandler.delete()
hadCleanShutdown = true
}
hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)
@@ -625,7 +625,7 @@ class LogManager(logDirs: Seq[File],
/**
* Close all the logs
*/
- def shutdown(): Unit = {
+ def shutdown(brokerEpoch: Long = -1): Unit = {
info("Shutting down.")
metricsGroup.removeMetric("OfflineLogDirectoryCount")
@@ -684,8 +684,9 @@ class LogManager(logDirs: Seq[File],
val logDirAbsolutePath = dir.getAbsolutePath
if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) ||
loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
- debug(s"Writing clean shutdown marker at $dir")
- CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
+ val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
+ debug(s"Writing clean shutdown marker at $dir with broker epoch=$brokerEpoch")
+ CoreUtils.swallow(cleanShutdownFileHandler.write(brokerEpoch), this)
}
}
}
@@ -1409,6 +1410,29 @@ class LogManager(logDirs: Seq[File],
None
}
}
+
+ def readBrokerEpochFromCleanShutdownFiles(): Long = {
+ // Verify whether all the log dirs have the same broker epoch in their clean shutdown files. If there is any dir not
+ // live, fail the broker epoch check.
+ if (liveLogDirs.size < logDirs.size) {
+ return -1L
+ }
+ var brokerEpoch = -1L
+ for (dir <- liveLogDirs) {
+ val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
+ val currentBrokerEpoch = cleanShutdownFileHandler.read
+ if (currentBrokerEpoch == -1L) {
+ info(s"Unable to read the broker epoch in ${dir.toString}.")
+ return -1L
+ }
+ if (brokerEpoch != -1 && currentBrokerEpoch != brokerEpoch) {
+ info(s"Found different broker epochs in ${dir.toString}. Other=$brokerEpoch vs current=$currentBrokerEpoch.")
+ return -1L
+ }
+ brokerEpoch = currentBrokerEpoch
+ }
+ brokerEpoch
+ }
}
object LogManager {
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 10074d4a5f5..112c6f57987 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -187,6 +187,11 @@ class BrokerLifecycleManager(
*/
private var _channelManager: NodeToControllerChannelManager = _
+ /**
+ * The broker epoch from the previous run, or -1 if the epoch is not able to be found.
+ */
+ @volatile private var previousBrokerEpoch: Long = -1L
+
/**
* The event queue.
*/
@@ -201,12 +206,18 @@ class BrokerLifecycleManager(
* @param highestMetadataOffsetProvider Provides the current highest metadata offset.
* @param channelManager The NodeToControllerChannelManager to use.
* @param clusterId The cluster ID.
+ * @param advertisedListeners The advertised listeners for this broker.
+ * @param supportedFeatures The features for this broker.
+ * @param previousBrokerEpoch The broker epoch before the reboot.
+ *
*/
def start(highestMetadataOffsetProvider: () => Long,
channelManager: NodeToControllerChannelManager,
clusterId: String,
advertisedListeners: ListenerCollection,
- supportedFeatures: util.Map[String, VersionRange]): Unit = {
+ supportedFeatures: util.Map[String, VersionRange],
+ previousBrokerEpoch: Long): Unit = {
+ this.previousBrokerEpoch = previousBrokerEpoch
eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
channelManager, clusterId, advertisedListeners, supportedFeatures))
}
@@ -310,7 +321,8 @@ class BrokerLifecycleManager(
setFeatures(features).
setIncarnationId(incarnationId).
setListeners(_advertisedListeners).
- setRack(rack.orNull)
+ setRack(rack.orNull).
+ setPreviousBrokerEpoch(previousBrokerEpoch)
if (isDebugEnabled) {
debug(s"Sending broker registration $data")
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index fe8e5acb9fa..06ba81e5258 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -335,7 +335,8 @@ class BrokerServer(
brokerLifecycleChannelManager,
sharedServer.metaProps.clusterId,
listenerInfo.toBrokerRegistrationRequest,
- featuresRemapped
+ featuresRemapped,
+ logManager.readBrokerEpochFromCleanShutdownFiles()
)
// If the BrokerLifecycleManager's initial catch-up future fails, it means we timed out
// or are shutting down before we could catch up. Therefore, also fail the firstPublishFuture.
@@ -645,7 +646,7 @@ class BrokerServer(
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
if (logManager != null)
- CoreUtils.swallow(logManager.shutdown(), this)
+ CoreUtils.swallow(logManager.shutdown(lifecycleManager.brokerEpoch), this)
// Close remote log manager to give a chance to any of its underlying clients
// (especially in RemoteStorageManager and RemoteLogMetadataManager) to close gracefully.
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a5a05c977e5..038df14cc91 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -437,7 +437,8 @@ class KafkaServer(
brokerToQuorumChannelManager,
kraftMetaProps.clusterId,
networkListeners,
- ibpAsFeature
+ ibpAsFeature,
+ -1
)
logger.debug("Start RaftManager")
}
diff --git a/core/src/test/java/kafka/log/CleanShutdownFileHandlerTest.java b/core/src/test/java/kafka/log/CleanShutdownFileHandlerTest.java
new file mode 100644
index 00000000000..977bcb758e3
--- /dev/null
+++ b/core/src/test/java/kafka/log/CleanShutdownFileHandlerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 10)
+class CleanShutdownFileHandlerTest {
+ @Test
+ public void testCleanShutdownFileBasic() {
+ File logDir;
+ logDir = assertDoesNotThrow(() -> Files.createTempDirectory("kafka-cleanShutdownFile").toFile());
+ CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir.getPath());
+ assertDoesNotThrow(() -> cleanShutdownFileHandler.write(10L));
+ assertTrue(cleanShutdownFileHandler.exists());
+ assertEquals(10L, cleanShutdownFileHandler.read());
+ assertDoesNotThrow(() -> cleanShutdownFileHandler.delete());
+ assertFalse(cleanShutdownFileHandler.exists());
+ }
+
+ @Test
+ public void testCleanShutdownFileNonExist() {
+ File logDir;
+ logDir = assertDoesNotThrow(() -> Files.createTempDirectory("kafka-cleanShutdownFile").toFile());
+ CleanShutdownFileHandler cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir.getPath());
+ assertDoesNotThrow(() -> cleanShutdownFileHandler.write(10L, 0));
+ assertTrue(cleanShutdownFileHandler.exists());
+ assertDoesNotThrow(() -> cleanShutdownFileHandler.delete());
+ assertFalse(cleanShutdownFileHandler.exists());
+ assertEquals(-1L, cleanShutdownFileHandler.read());
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index b9b41259d4b..da19c7055ac 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -181,25 +181,25 @@ class LogLoaderTest {
(logManager, runLoadLogs)
}
- val cleanShutdownFile = new File(logDir, LogLoader.CleanShutdownFile)
+ val cleanShutdownFileHandler = new CleanShutdownFileHandler(logDir.getPath)
locally {
val (logManager, _) = initializeLogManagerForSimulatingErrorTest()
// Load logs after a clean shutdown
- Files.createFile(cleanShutdownFile.toPath)
+ cleanShutdownFileHandler.write(0L)
cleanShutdownInterceptedValue = false
var defaultConfig = logManager.currentDefaultConfig
logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
assertTrue(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag")
- assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not exist after loadLogs has completed")
+ assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not exist after loadLogs has completed")
// Load logs without clean shutdown file
cleanShutdownInterceptedValue = true
defaultConfig = logManager.currentDefaultConfig
logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
assertFalse(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag")
- assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not exist after loadLogs has completed")
+ assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not exist after loadLogs has completed")
// Create clean shutdown file and then simulate error while loading logs such that log loading does not complete.
- Files.createFile(cleanShutdownFile.toPath)
+ cleanShutdownFileHandler.write(0L)
logManager.shutdown()
}
@@ -210,7 +210,7 @@ class LogLoaderTest {
simulateError.hasError = true
simulateError.errorType = ErrorTypes.RuntimeException
assertThrows(classOf[RuntimeException], runLoadLogs)
- assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed")
+ assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not have existed")
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown")
// Simulate Kafka storage error with IOException cause
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7e35857c5f7..eda800abb11 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -127,10 +127,39 @@ class LogManagerTest {
// This should cause log1.close() to fail during LogManger shutdown sequence.
FileUtils.deleteDirectory(logFile1)
- logManagerForTest.get.shutdown()
+ logManagerForTest.get.shutdown(3)
- assertFalse(Files.exists(new File(logDir1, LogLoader.CleanShutdownFile).toPath))
- assertTrue(Files.exists(new File(logDir2, LogLoader.CleanShutdownFile).toPath))
+ assertFalse(Files.exists(new File(logDir1, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
+ assertTrue(Files.exists(new File(logDir2, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
+ assertEquals(-1L, logManagerForTest.get.readBrokerEpochFromCleanShutdownFiles())
+ } finally {
+ logManagerForTest.foreach(manager => manager.liveLogDirs.foreach(Utils.delete))
+ }
+ }
+
+ @Test
+ def testCleanShutdownFileWithBrokerEpoch(): Unit = {
+ val logDir1 = TestUtils.tempDir()
+ val logDir2 = TestUtils.tempDir()
+ var logManagerForTest: Option[LogManager] = Option.empty
+ try {
+ logManagerForTest = Some(createLogManager(Seq(logDir1, logDir2)))
+
+ assertEquals(2, logManagerForTest.get.liveLogDirs.size)
+ logManagerForTest.get.startup(Set.empty)
+ logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
+ logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 1), topicId = None)
+
+ val logFile1 = new File(logDir1, name + "-0")
+ assertTrue(logFile1.exists)
+ val logFile2 = new File(logDir2, name + "-1")
+ assertTrue(logFile2.exists)
+
+ logManagerForTest.get.shutdown(3)
+
+ assertTrue(Files.exists(new File(logDir1, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
+ assertTrue(Files.exists(new File(logDir2, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
+ assertEquals(3L, logManagerForTest.get.readBrokerEpochFromCleanShutdownFiles())
} finally {
logManagerForTest.foreach(manager => manager.liveLogDirs.foreach(Utils.delete))
}
@@ -160,7 +189,7 @@ class LogManagerTest {
// 2. simulate unclean shutdown by deleting clean shutdown marker file
logManager.shutdown()
- assertTrue(Files.deleteIfExists(new File(logDir, LogLoader.CleanShutdownFile).toPath))
+ assertTrue(Files.deleteIfExists(new File(logDir, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
// 3. create a new LogManager and start it in a different thread
@volatile var loadLogCalled = 0
@@ -186,7 +215,7 @@ class LogManagerTest {
logManager = null
// 5. verify that CleanShutdownFile is not created under logDir
- assertFalse(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath))
+ assertFalse(Files.exists(new File(logDir, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).toPath))
}
/**
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 588508ae3d0..90d93466c29 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.Node
import org.apache.kafka.common.message.{BrokerHeartbeatResponseData, BrokerRegistrationResponseData}
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{AbstractRequest, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, BrokerRegistrationResponse}
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.api.Assertions._
@@ -55,7 +55,7 @@ class BrokerLifecycleManagerTest {
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
- Collections.emptyMap())
+ Collections.emptyMap(), -1)
TestUtils.retry(60000) {
assertEquals(BrokerState.STARTING, manager.state)
}
@@ -69,17 +69,20 @@ class BrokerLifecycleManagerTest {
val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
- context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
- new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
- Collections.emptyMap())
+ Collections.emptyMap(), 10L)
+ TestUtils.retry(60000) {
+ assertEquals(1, context.mockChannelManager.unsentQueue.size)
+ assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
+ }
+ context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
+ new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode)
TestUtils.retry(10000) {
context.poll()
assertEquals(1000L, manager.brokerEpoch)
}
manager.close()
-
}
@Test
@@ -98,7 +101,7 @@ class BrokerLifecycleManagerTest {
assertEquals(1, context.mockClient.futureResponses().size)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
- Collections.emptyMap())
+ Collections.emptyMap(), -1)
// We should send the first registration request and get a failure immediately
TestUtils.retry(60000) {
context.poll()
@@ -136,7 +139,7 @@ class BrokerLifecycleManagerTest {
new BrokerHeartbeatResponseData().setIsCaughtUp(true)), controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId, context.advertisedListeners,
- Collections.emptyMap())
+ Collections.emptyMap(), -1)
TestUtils.retry(10000) {
context.poll()
manager.eventQueue.wakeup()
diff --git a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index 1172e266e76..6c10ba89db7 100644
--- a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
+++ b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -29,7 +29,7 @@ class MockNodeToControllerChannelManager(
val retryTimeoutMs: Int = 60000,
val requestTimeoutMs: Int = 30000
) extends NodeToControllerChannelManager {
- private val unsentQueue = new java.util.ArrayDeque[NodeToControllerQueueItem]()
+ val unsentQueue = new java.util.ArrayDeque[NodeToControllerQueueItem]()
client.setNodeApiVersions(controllerApiVersions)
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 0b49b2f732d..f8571316f0f 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -17,7 +17,8 @@
package kafka.server.epoch
-import kafka.log.{LogLoader, UnifiedLog}
+import kafka.log.CleanShutdownFileHandler
+import kafka.log.UnifiedLog
import kafka.server.KafkaConfig._
import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
import kafka.tools.DumpLogSegments
@@ -36,7 +37,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import java.io.{File, RandomAccessFile}
import java.util.{Collections, Properties}
-import scala.collection.Seq
import scala.jdk.CollectionConverters._
/**
@@ -48,7 +48,6 @@ import scala.jdk.CollectionConverters._
* A test which validates the end to end workflow is also included.
*/
class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging {
-
// Set this to IBP_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case
override def metadataVersion = MetadataVersion.latest
val topic = "topic1"
@@ -152,7 +151,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
broker100.shutdown()
//Delete the clean shutdown file to simulate crash
- new File(broker100.config.logDirs.head, LogLoader.CleanShutdownFile).delete()
+ new File(broker100.config.logDirs.head, CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).delete()
//Delete 5 messages from the leader's log on 100
deleteMessagesFromLogFile(5 * msg.length, broker100, 0)
@@ -199,7 +198,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
brokers.foreach { b => b.shutdown() }
//Delete the clean shutdown file to simulate crash
- new File(brokers(0).config.logDirs(0), LogLoader.CleanShutdownFile).delete()
+ new File(brokers(0).config.logDirs(0), CleanShutdownFileHandler.CLEAN_SHUTDOWN_FILE_NAME).delete()
//Delete half the messages from the log file
deleteMessagesFromLogFile(getLogFile(brokers(0), 0).length() / 2, brokers(0), 0)
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 511f51064e1..c23b58ee74e 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -262,7 +262,7 @@ public class ReplicaFetcherThreadBenchmark {
public void tearDown() throws IOException, InterruptedException {
metrics.close();
replicaManager.shutdown(false);
- logManager.shutdown();
+ logManager.shutdown(-1L);
scheduler.shutdown();
Utils.delete(logDir);
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index b28ab90818f..cb8554083a7 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -143,7 +143,7 @@ public class PartitionMakeFollowerBenchmark {
@TearDown(Level.Trial)
public void tearDown() throws IOException, InterruptedException {
executorService.shutdownNow();
- logManager.shutdown();
+ logManager.shutdown(-1L);
scheduler.shutdown();
Utils.delete(logDir);
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index eafe0e4b27e..0ebc3de2ff7 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -147,7 +147,7 @@ public class UpdateFollowerFetchStateBenchmark {
@TearDown(Level.Trial)
public void tearDown() throws InterruptedException {
- logManager.shutdown();
+ logManager.shutdown(-1L);
scheduler.shutdown();
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 0bbf934e6e8..a1b58970391 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -173,7 +173,7 @@ public class PartitionCreationBench {
@TearDown(Level.Invocation)
public void tearDown() throws Exception {
this.replicaManager.shutdown(false);
- logManager.shutdown();
+ logManager.shutdown(-1L);
this.metrics.close();
this.scheduler.shutdown();
this.quotaManagers.shutdown();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 49ddc3a53f4..fa984c1e4dc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -317,7 +317,8 @@ public class ClusterControlManager {
public ControllerResult registerBroker(
BrokerRegistrationRequestData request,
long brokerEpoch,
- FinalizedControllerFeatures finalizedFeatures) {
+ FinalizedControllerFeatures finalizedFeatures,
+ short version) {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}
@@ -327,6 +328,10 @@ public class ClusterControlManager {
}
int brokerId = request.brokerId();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
+ if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) {
+ // TODO(KIP-966): Update the ELR if the broker has an unclean shutdown.
+ log.debug("Received an unclean shutdown request");
+ }
if (existing != null) {
if (heartbeatManager.hasValidSession(brokerId)) {
if (!existing.incarnationId().equals(request.incarnationId())) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 5b437cec754..6e111c55138 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -2157,7 +2157,7 @@ public final class QuorumController implements Controller {
() -> {
ControllerResult result = clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(), featureControl.
- finalizedFeatures(Long.MAX_VALUE));
+ finalizedFeatures(Long.MAX_VALUE), context.requestHeader().requestApiVersion());
rescheduleMaybeFenceStaleBrokers();
return result;
},
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 1749ec3e87a..54e73b2617b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -264,7 +264,8 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
- new FinalizedControllerFeatures(Collections.emptyMap(), 456L)));
+ new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
+ (short) 1));
}
@ParameterizedTest
@@ -294,7 +295,8 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
- new FinalizedControllerFeatures(Collections.emptyMap(), 456L));
+ new FinalizedControllerFeatures(Collections.emptyMap(), 456L),
+ (short) 1);
short expectedVersion = metadataVersion.registerBrokerRecordVersion();
@@ -517,7 +519,8 @@ public class ClusterControlManagerTest {
setRack(null).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
- featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
+ featureControl.finalizedFeatures(Long.MAX_VALUE),
+ (short) 1)).getMessage());
assertEquals("Unable to register because the broker does not support version 4 of " +
"metadata.version. It wants a version between 7 and 7, inclusive.",
@@ -534,7 +537,8 @@ public class ClusterControlManagerTest {
setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())).
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
123L,
- featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
+ featureControl.finalizedFeatures(Long.MAX_VALUE),
+ (short) 1)).getMessage());
}
@Test