Browse Source

KAFKA-6526: Enable unclean leader election without controller change (#4920)

Enable dynamic update of default unclean leader election config of brokers. A new controller event has been added to process unclean leader election when the config is enabled dynamically.

Reviewers: Dong Lin <lindong28@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
pull/4945/merge
Rajini Sivaram 7 years ago committed by GitHub
parent
commit
b4d8552218
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      core/src/main/scala/kafka/controller/ControllerState.scala
  2. 14
      core/src/main/scala/kafka/controller/KafkaController.scala
  3. 8
      core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
  4. 62
      core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

6
core/src/main/scala/kafka/controller/ControllerState.scala

@ -90,7 +90,11 @@ object ControllerState { @@ -90,7 +90,11 @@ object ControllerState {
def value = 12
}
case object UncleanLeaderElectionEnable extends ControllerState {
def value = 13
}
val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
LogDirChange, ControllerShutdown)
LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable)
}

14
core/src/main/scala/kafka/controller/KafkaController.scala

@ -195,6 +195,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -195,6 +195,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
zkClient.updateBrokerInfoInZk(newBrokerInfo)
}
private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
eventManager.put(UncleanLeaderElectionEnable)
}
private def state: ControllerState = eventManager.state
/**
@ -1009,6 +1013,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti @@ -1009,6 +1013,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
case object UncleanLeaderElectionEnable extends ControllerEvent {
def state = ControllerState.UncleanLeaderElectionEnable
override def process(): Unit = {
if (!isActive) return
partitionStateMachine.triggerOnlinePartitionStateChange()
}
}
case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
def state = ControllerState.ControlledShutdown

8
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

@ -156,7 +156,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging @@ -156,7 +156,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
@ -501,7 +501,7 @@ object DynamicLogConfig { @@ -501,7 +501,7 @@ object DynamicLogConfig {
val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- ExcludedConfigs
val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case (k, v) => (v, k) }
}
class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Logging {
class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Reconfigurable with Logging {
override def configure(configs: util.Map[String, _]): Unit = {}
@ -517,6 +517,7 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi @@ -517,6 +517,7 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
override def reconfigure(configs: util.Map[String, _]): Unit = {
val currentLogConfig = logManager.currentDefaultConfig
val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) =>
if (v != null) {
@ -536,6 +537,9 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi @@ -536,6 +537,9 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
val logConfig = LogConfig(props.asJava)
log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
}
if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable) {
server.kafkaController.enableDefaultUncleanLeaderElection()
}
}
}

62
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

@ -43,7 +43,7 @@ import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym} @@ -43,7 +43,7 @@ import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.SslConfigs._
import org.apache.kafka.common.config.types.Password
@ -429,6 +429,62 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @@ -429,6 +429,62 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test
def testUncleanLeaderElectionEnable(): Unit = {
val topic = "testtopic2"
TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
val producer = ProducerBuilder().maxRetries(1000).acks(1).build()
val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
consumer.commitSync()
def partitionInfo: TopicPartitionInfo =
adminClients.head.describeTopics(Collections.singleton(topic)).values.get(topic).get().partitions().get(0)
val partitionInfo0 = partitionInfo
assertEquals(partitionInfo0.replicas.get(0), partitionInfo0.leader)
val leaderBroker = servers.find(_.config.brokerId == partitionInfo0.replicas.get(0).id).get
val followerBroker = servers.find(_.config.brokerId == partitionInfo0.replicas.get(1).id).get
// Stop follower
followerBroker.shutdown()
followerBroker.awaitShutdown()
// Produce and consume some messages when the only follower is down, this should succeed since MinIsr is 1
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
consumer.commitSync()
// Shutdown leader and startup follower
leaderBroker.shutdown()
leaderBroker.awaitShutdown()
followerBroker.startup()
val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
// Verify that new leader is not elected with unclean leader disabled since there are no ISRs
TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean leader elected")
// Enable unclean leader election
val newProps = new Properties
newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get
waitForConfigOnServer(controller, KafkaConfig.UncleanLeaderElectionEnableProp, "true")
// Verify that the old follower with missing records is elected as the new leader
val (newLeader, elected) = TestUtils.computeUntilTrue(partitionInfo.leader)(leader => leader != null)
assertTrue("Unclean leader not elected", elected)
assertEquals(followerBroker.config.brokerId, newLeader.id)
// New leader doesn't have the last 10 records committed on the old leader that have already been consumed.
// With unclean leader election enabled, we should be able to produce to the new leader. The first 10 records
// produced will not be consumed since they have offsets less than the consumer's committed offset.
// Next 10 records produced should be consumed.
(1 to 10).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
.map(producer.send)
.map(_.get(10, TimeUnit.SECONDS))
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
consumer.commitSync()
}
@Test
def testThreadPoolResize(): Unit = {
val requestHandlerPrefix = "kafka-request-handler-"
@ -1272,12 +1328,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @@ -1272,12 +1328,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] {
private var _retries = 0
private var _acks = -1
def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this }
def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
override def build(): KafkaProducer[String, String] = {
val producer = TestUtils.createNewProducer(bootstrapServers,
acks = -1,
acks = _acks,
retries = _retries,
securityProtocol = _securityProtocol,
trustStoreFile = Some(trustStoreFile1),

Loading…
Cancel
Save