diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala index 17af7775901..d2473058ac7 100644 --- a/core/src/main/scala/kafka/controller/ControllerState.scala +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -90,7 +90,11 @@ object ControllerState { def value = 12 } + case object UncleanLeaderElectionEnable extends ControllerState { + def value = 13 + } + val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived, - LogDirChange, ControllerShutdown) + LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index eee625ef663..bc721e39f96 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -195,6 +195,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti zkClient.updateBrokerInfoInZk(newBrokerInfo) } + private[kafka] def enableDefaultUncleanLeaderElection(): Unit = { + eventManager.put(UncleanLeaderElectionEnable) + } + private def state: ControllerState = eventManager.state /** @@ -1009,6 +1013,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } + case object UncleanLeaderElectionEnable extends ControllerEvent { + + def state = ControllerState.UncleanLeaderElectionEnable + + override def process(): Unit = { + if (!isActive) return + partitionStateMachine.triggerOnlinePartitionStateChange() + } + } + case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent { def state = ControllerState.ControlledShutdown diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index be0ed6b1999..004b531da1b 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -156,7 +156,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addBrokerReconfigurable(new DynamicThreadPool(kafkaServer)) if (kafkaServer.logManager.cleaner != null) addBrokerReconfigurable(kafkaServer.logManager.cleaner) - addReconfigurable(new DynamicLogConfig(kafkaServer.logManager)) + addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer)) addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer)) addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer)) addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer)) @@ -501,7 +501,7 @@ object DynamicLogConfig { val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- ExcludedConfigs val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case (k, v) => (v, k) } } -class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Logging { +class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Reconfigurable with Logging { override def configure(configs: util.Map[String, _]): Unit = {} @@ -517,6 +517,7 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi override def reconfigure(configs: util.Map[String, _]): Unit = { val currentLogConfig = logManager.currentDefaultConfig + val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals) configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) => if (v != null) { @@ -536,6 +537,9 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi val logConfig = LogConfig(props.asJava) log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig) } + if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable) { + server.kafkaController.enableDefaultUncleanLeaderElection() + } } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 3c6c0391b68..8b70875cb78 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -43,7 +43,7 @@ import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition} +import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition, TopicPartitionInfo} import org.apache.kafka.common.config.{ConfigException, ConfigResource} import org.apache.kafka.common.config.SslConfigs._ import org.apache.kafka.common.config.types.Password @@ -429,6 +429,62 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet stopAndVerifyProduceConsume(producerThread, consumerThread) } + @Test + def testUncleanLeaderElectionEnable(): Unit = { + val topic = "testtopic2" + TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers) + val producer = ProducerBuilder().maxRetries(1000).acks(1).build() + val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build() + verifyProduceConsume(producer, consumer, numRecords = 10, topic) + consumer.commitSync() + + def partitionInfo: TopicPartitionInfo = + adminClients.head.describeTopics(Collections.singleton(topic)).values.get(topic).get().partitions().get(0) + + val partitionInfo0 = partitionInfo + assertEquals(partitionInfo0.replicas.get(0), partitionInfo0.leader) + val leaderBroker = servers.find(_.config.brokerId == partitionInfo0.replicas.get(0).id).get + val followerBroker = servers.find(_.config.brokerId == partitionInfo0.replicas.get(1).id).get + + // Stop follower + followerBroker.shutdown() + followerBroker.awaitShutdown() + + // Produce and consume some messages when the only follower is down, this should succeed since MinIsr is 1 + verifyProduceConsume(producer, consumer, numRecords = 10, topic) + consumer.commitSync() + + // Shutdown leader and startup follower + leaderBroker.shutdown() + leaderBroker.awaitShutdown() + followerBroker.startup() + val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get + + // Verify that new leader is not elected with unclean leader disabled since there are no ISRs + TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean leader elected") + + // Enable unclean leader election + val newProps = new Properties + newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") + TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get + waitForConfigOnServer(controller, KafkaConfig.UncleanLeaderElectionEnableProp, "true") + + // Verify that the old follower with missing records is elected as the new leader + val (newLeader, elected) = TestUtils.computeUntilTrue(partitionInfo.leader)(leader => leader != null) + assertTrue("Unclean leader not elected", elected) + assertEquals(followerBroker.config.brokerId, newLeader.id) + + // New leader doesn't have the last 10 records committed on the old leader that have already been consumed. + // With unclean leader election enabled, we should be able to produce to the new leader. The first 10 records + // produced will not be consumed since they have offsets less than the consumer's committed offset. + // Next 10 records produced should be consumed. + (1 to 10).map(i => new ProducerRecord(topic, s"key$i", s"value$i")) + .map(producer.send) + .map(_.get(10, TimeUnit.SECONDS)) + verifyProduceConsume(producer, consumer, numRecords = 10, topic) + consumer.commitSync() + } + @Test def testThreadPoolResize(): Unit = { val requestHandlerPrefix = "kafka-request-handler-" @@ -1272,12 +1328,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] { private var _retries = 0 + private var _acks = -1 def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this } + def acks(acks: Int): ProducerBuilder = { _acks = acks; this } override def build(): KafkaProducer[String, String] = { val producer = TestUtils.createNewProducer(bootstrapServers, - acks = -1, + acks = _acks, retries = _retries, securityProtocol = _securityProtocol, trustStoreFile = Some(trustStoreFile1),