Browse Source

KAFKA-6835: Enable topic unclean leader election to be enabled without controller change (#4957)

Reviewers: Jun Rao <junrao@gmail.com>
pull/5540/head
Manikumar Reddy O 6 years ago committed by Jun Rao
parent
commit
028e80204d
  1. 6
      core/src/main/scala/kafka/controller/ControllerState.scala
  2. 16
      core/src/main/scala/kafka/controller/KafkaController.scala
  3. 8
      core/src/main/scala/kafka/controller/PartitionStateMachine.scala
  4. 8
      core/src/main/scala/kafka/server/ConfigHandler.scala
  5. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  6. 72
      core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
  7. 8
      core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
  8. 16
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  9. 4
      docs/upgrade.html

6
core/src/main/scala/kafka/controller/ControllerState.scala

@ -94,7 +94,11 @@ object ControllerState {
def value = 13 def value = 13
} }
case object TopicUncleanLeaderElectionEnable extends ControllerState {
def value = 14
}
val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived, PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable) LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, TopicUncleanLeaderElectionEnable)
} }

16
core/src/main/scala/kafka/controller/KafkaController.scala

@ -202,6 +202,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
eventManager.put(UncleanLeaderElectionEnable) eventManager.put(UncleanLeaderElectionEnable)
} }
private[kafka] def enableTopicUncleanLeaderElection(topic: String): Unit = {
if (isActive) {
eventManager.put(TopicUncleanLeaderElectionEnable(topic))
}
}
private def state: ControllerState = eventManager.state private def state: ControllerState = eventManager.state
/** /**
@ -1025,6 +1031,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
} }
} }
case class TopicUncleanLeaderElectionEnable(topic: String) extends ControllerEvent {
def state = ControllerState.TopicUncleanLeaderElectionEnable
override def process(): Unit = {
if (!isActive) return
partitionStateMachine.triggerOnlinePartitionStateChange(topic)
}
}
case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent { case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
def state = ControllerState.ControlledShutdown def state = ControllerState.ControlledShutdown

8
core/src/main/scala/kafka/controller/PartitionStateMachine.scala

@ -97,6 +97,14 @@ class PartitionStateMachine(config: KafkaConfig,
* state. This is called on a successful controller election and on broker changes * state. This is called on a successful controller election and on broker changes
*/ */
def triggerOnlinePartitionStateChange() { def triggerOnlinePartitionStateChange() {
triggerOnlinePartitionStateChange(partitionState.toMap)
}
def triggerOnlinePartitionStateChange(topic: String) {
triggerOnlinePartitionStateChange(partitionState.filterKeys(p => p.topic.equals(topic)).toMap)
}
def triggerOnlinePartitionStateChange(partitionState: Map[TopicPartition, PartitionState]) {
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
// that belong to topics to be deleted // that belong to topics to be deleted
val partitionsToTrigger = partitionState.filter { case (partition, partitionState) => val partitionsToTrigger = partitionState.filter { case (partition, partitionState) =>

8
core/src/main/scala/kafka/server/ConfigHandler.scala

@ -21,6 +21,7 @@ import java.util.Properties
import DynamicConfig.Broker._ import DynamicConfig.Broker._
import kafka.api.ApiVersion import kafka.api.ApiVersion
import kafka.controller.KafkaController
import kafka.log.{LogConfig, LogManager} import kafka.log.{LogConfig, LogManager}
import kafka.security.CredentialProvider import kafka.security.CredentialProvider
import kafka.server.Constants._ import kafka.server.Constants._
@ -33,6 +34,7 @@ import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.common.utils.Sanitizer
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.Try
/** /**
* The ConfigHandler is used to process config change notifications received by the DynamicConfigManager * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
@ -45,7 +47,7 @@ trait ConfigHandler {
* The TopicConfigHandler will process topic config changes in ZK. * The TopicConfigHandler will process topic config changes in ZK.
* The callback provides the topic name and the full properties set read from ZK * The callback provides the topic name and the full properties set read from ZK
*/ */
class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers) extends ConfigHandler with Logging { class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers, kafkaController: KafkaController) extends ConfigHandler with Logging {
def processConfigChanges(topic: String, topicConfig: Properties) { def processConfigChanges(topic: String, topicConfig: Properties) {
// Validate the configurations. // Validate the configurations.
@ -74,6 +76,10 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
} }
updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader) updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)
updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower) updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower)
if (Try(topicConfig.getProperty(KafkaConfig.UncleanLeaderElectionEnableProp).toBoolean).getOrElse(false)) {
kafkaController.enableTopicUncleanLeaderElection(topic)
}
} }
def parseThrottledPartitions(topicConfig: Properties, brokerId: Int, prop: String): Seq[Int] = { def parseThrottledPartitions(topicConfig: Properties, brokerId: Int, prop: String): Seq[Int] = {

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -304,7 +304,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
config.dynamicConfig.addReconfigurables(this) config.dynamicConfig.addReconfigurables(this)
/* start dynamic config manager */ /* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers), dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider), ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

72
core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala

@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.junit.Assert._ import org.junit.Assert._
class UncleanLeaderElectionTest extends ZooKeeperTestHarness { class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@ -238,7 +239,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// message production and consumption should both fail while leader is down // message production and consumption should both fail while leader is down
try { try {
produceMessage(servers, topic, "third") produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000)
fail("Message produced while leader is down should fail, but it succeeded") fail("Message produced while leader is down should fail, but it succeeded")
} catch { } catch {
case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
@ -280,4 +281,73 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
TestUtils.consumeRecords(consumer, numMessages).map(_.value) TestUtils.consumeRecords(consumer, numMessages).map(_.value)
} finally consumer.close() } finally consumer.close()
} }
@Test
def testTopicUncleanLeaderElectionEnable(): Unit = {
// unclean leader election is disabled by default
startBrokers(Seq(configProps1, configProps2))
// create topic with 1 partition, 2 replicas, one on each broker
adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
// wait until leader is elected
val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
// the non-leader broker is the follower
val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
produceMessage(servers, topic, "first")
waitUntilMetadataIsPropagated(servers, topic, partitionId)
assertEquals(List("first"), consumeAllMessages(topic, 1))
// shutdown follower server
servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
produceMessage(servers, topic, "second")
assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
//remove any previous unclean election metric
servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
// shutdown leader and then restart follower
servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
val followerServer = servers.find(_.config.brokerId == followerId).get
followerServer.startup()
assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
// message production and consumption should both fail while leader is down
try {
produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000)
fail("Message produced while leader is down should fail, but it succeeded")
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
}
assertEquals(List.empty[String], consumeAllMessages(topic, 0))
// Enable unclean leader election for topic
val adminClient = createAdminClient()
val newProps = new Properties
newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
TestUtils.alterTopicConfigs(adminClient, topic, newProps).all.get
adminClient.close()
// wait until new leader is (uncleanly) elected
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
produceMessage(servers, topic, "third")
// second message was lost due to unclean election
assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
}
private def createAdminClient(): AdminClient = {
val config = new Properties
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT"))
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
AdminClient.create(config)
}
} }

8
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala

@ -233,7 +233,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test @Test
def shouldParseReplicationQuotaProperties(): Unit = { def shouldParseReplicationQuotaProperties(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null)
val props: Properties = new Properties() val props: Properties = new Properties()
//Given //Given
@ -246,7 +246,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test @Test
def shouldParseWildcardReplicationQuotaProperties(): Unit = { def shouldParseWildcardReplicationQuotaProperties(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null)
val props: Properties = new Properties() val props: Properties = new Properties()
//Given //Given
@ -261,7 +261,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test @Test
def shouldParseReplicationQuotaReset(): Unit = { def shouldParseReplicationQuotaReset(): Unit = {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null)
val props: Properties = new Properties() val props: Properties = new Properties()
//Given //Given
@ -276,7 +276,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test @Test
def shouldParseRegardlessOfWhitespaceAroundValues() { def shouldParseRegardlessOfWhitespaceAroundValues() {
val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null)
assertEquals(AllReplicas, parse(configHandler, "* ")) assertEquals(AllReplicas, parse(configHandler, "* "))
assertEquals(Seq(), parse(configHandler, " ")) assertEquals(Seq(), parse(configHandler, " "))
assertEquals(Seq(6), parse(configHandler, "6:102")) assertEquals(Seq(6), parse(configHandler, "6:102"))

16
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -943,11 +943,16 @@ object TestUtils extends Logging {
values values
} }
def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) { def produceMessage(servers: Seq[KafkaServer], topic: String, message: String,
val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers)) deliveryTimeoutMs: Int = 30 * 1000, requestTimeoutMs: Int = 20 * 1000) {
val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers),
deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs = requestTimeoutMs)
try {
producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
} finally {
producer.close() producer.close()
} }
}
def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) { def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
@ -1306,6 +1311,13 @@ object TestUtils extends Logging {
adminClient.alterConfigs(configs) adminClient.alterConfigs(configs)
} }
def alterTopicConfigs(adminClient: AdminClient, topic: String, topicConfigs: Properties): AlterConfigsResult = {
val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
val configs = Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> newConfig).asJava
adminClient.alterConfigs(configs)
}
/** /**
* Capture the console output during the execution of the provided function. * Capture the console output during the execution of the provided function.
*/ */

4
docs/upgrade.html

@ -41,6 +41,10 @@
Even though the old <code>Describe Cluster</code> access is still supported for backward compatibility, using it for this API is not advised.</li> Even though the old <code>Describe Cluster</code> access is still supported for backward compatibility, using it for this API is not advised.</li>
</ol> </ol>
<h5><a id="upgrade_210_notable" href="#upgrade_210_notable">Notable changes in 2.1.0</a></h5>
<ul>
<li>Unclean leader election is automatically enabled by the controller when <code>unclean.leader.election.enable</code> config is dynamically updated by using per-topic config override.</li>
</ul>
<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0</a></h4> <h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0</a></h4>
<p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, <p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,

Loading…
Cancel
Save