diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala
index d2473058ac7..aa41c7f5458 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -94,7 +94,11 @@ object ControllerState {
def value = 13
}
+ case object TopicUncleanLeaderElectionEnable extends ControllerState {
+ def value = 14
+ }
+
val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
- LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable)
+ LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, TopicUncleanLeaderElectionEnable)
}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e397e80db66..f6ea43da062 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -202,6 +202,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
eventManager.put(UncleanLeaderElectionEnable)
}
+ private[kafka] def enableTopicUncleanLeaderElection(topic: String): Unit = {
+ if (isActive) {
+ eventManager.put(TopicUncleanLeaderElectionEnable(topic))
+ }
+ }
+
private def state: ControllerState = eventManager.state
/**
@@ -1025,6 +1031,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
+ case class TopicUncleanLeaderElectionEnable(topic: String) extends ControllerEvent {
+
+ def state = ControllerState.TopicUncleanLeaderElectionEnable
+
+ override def process(): Unit = {
+ if (!isActive) return
+ partitionStateMachine.triggerOnlinePartitionStateChange(topic)
+ }
+ }
+
case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
def state = ControllerState.ControlledShutdown
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index db4c7161f35..11e38d46ffd 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -97,6 +97,14 @@ class PartitionStateMachine(config: KafkaConfig,
* state. This is called on a successful controller election and on broker changes
*/
def triggerOnlinePartitionStateChange() {
+ triggerOnlinePartitionStateChange(partitionState.toMap)
+ }
+
+ def triggerOnlinePartitionStateChange(topic: String) {
+ triggerOnlinePartitionStateChange(partitionState.filterKeys(p => p.topic.equals(topic)).toMap)
+ }
+
+ def triggerOnlinePartitionStateChange(partitionState: Map[TopicPartition, PartitionState]) {
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
// that belong to topics to be deleted
val partitionsToTrigger = partitionState.filter { case (partition, partitionState) =>
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 78c3abf164a..5593225f0e6 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -21,6 +21,7 @@ import java.util.Properties
import DynamicConfig.Broker._
import kafka.api.ApiVersion
+import kafka.controller.KafkaController
import kafka.log.{LogConfig, LogManager}
import kafka.security.CredentialProvider
import kafka.server.Constants._
@@ -33,6 +34,7 @@ import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer
import scala.collection.JavaConverters._
+import scala.util.Try
/**
* The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
@@ -45,7 +47,7 @@ trait ConfigHandler {
* The TopicConfigHandler will process topic config changes in ZK.
* The callback provides the topic name and the full properties set read from ZK
*/
-class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers) extends ConfigHandler with Logging {
+class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers, kafkaController: KafkaController) extends ConfigHandler with Logging {
def processConfigChanges(topic: String, topicConfig: Properties) {
// Validate the configurations.
@@ -74,6 +76,10 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
}
updateThrottledList(LogConfig.LeaderReplicationThrottledReplicasProp, quotas.leader)
updateThrottledList(LogConfig.FollowerReplicationThrottledReplicasProp, quotas.follower)
+
+ if (Try(topicConfig.getProperty(KafkaConfig.UncleanLeaderElectionEnableProp).toBoolean).getOrElse(false)) {
+ kafkaController.enableTopicUncleanLeaderElection(topic)
+ }
}
def parseThrottledPartitions(topicConfig: Properties, brokerId: Int, prop: String): Seq[Int] = {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0ac877d3b4a..c2a49a15297 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -304,7 +304,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
config.dynamicConfig.addReconfigurables(this)
/* start dynamic config manager */
- dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
+ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 0250cfbee82..89fcebf5b1a 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.junit.Assert._
class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -238,7 +239,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// message production and consumption should both fail while leader is down
try {
- produceMessage(servers, topic, "third")
+ produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000)
fail("Message produced while leader is down should fail, but it succeeded")
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
@@ -280,4 +281,73 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
TestUtils.consumeRecords(consumer, numMessages).map(_.value)
} finally consumer.close()
}
+
+ @Test
+ def testTopicUncleanLeaderElectionEnable(): Unit = {
+ // unclean leader election is disabled by default
+ startBrokers(Seq(configProps1, configProps2))
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+
+ // wait until leader is elected
+ val leaderId = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+
+ // the non-leader broker is the follower
+ val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
+
+ produceMessage(servers, topic, "first")
+ waitUntilMetadataIsPropagated(servers, topic, partitionId)
+ assertEquals(List("first"), consumeAllMessages(topic, 1))
+
+ // shutdown follower server
+ servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
+
+ produceMessage(servers, topic, "second")
+ assertEquals(List("first", "second"), consumeAllMessages(topic, 2))
+
+ //remove any previous unclean election metric
+ servers.map(server => server.kafkaController.controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec"))
+
+ // shutdown leader and then restart follower
+ servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
+ val followerServer = servers.find(_.config.brokerId == followerId).get
+ followerServer.startup()
+
+ assertEquals(0, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
+
+ // message production and consumption should both fail while leader is down
+ try {
+ produceMessage(servers, topic, "third", deliveryTimeoutMs = 1000, requestTimeoutMs = 1000)
+ fail("Message produced while leader is down should fail, but it succeeded")
+ } catch {
+ case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected
+ }
+
+ assertEquals(List.empty[String], consumeAllMessages(topic, 0))
+
+ // Enable unclean leader election for topic
+ val adminClient = createAdminClient()
+ val newProps = new Properties
+ newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+ TestUtils.alterTopicConfigs(adminClient, topic, newProps).all.get
+ adminClient.close()
+
+ // wait until new leader is (uncleanly) elected
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
+ assertEquals(1, followerServer.kafkaController.controllerContext.stats.uncleanLeaderElectionRate.count())
+
+ produceMessage(servers, topic, "third")
+
+ // second message was lost due to unclean election
+ assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
+ }
+
+ private def createAdminClient(): AdminClient = {
+ val config = new Properties
+ val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT"))
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+ config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
+ AdminClient.create(config)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index f5c5c9b2802..510c4a3e273 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -233,7 +233,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test
def shouldParseReplicationQuotaProperties(): Unit = {
- val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
+ val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null)
val props: Properties = new Properties()
//Given
@@ -246,7 +246,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test
def shouldParseWildcardReplicationQuotaProperties(): Unit = {
- val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
+ val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null)
val props: Properties = new Properties()
//Given
@@ -261,7 +261,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test
def shouldParseReplicationQuotaReset(): Unit = {
- val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
+ val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null)
val props: Properties = new Properties()
//Given
@@ -276,7 +276,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
@Test
def shouldParseRegardlessOfWhitespaceAroundValues() {
- val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null)
+ val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null, null)
assertEquals(AllReplicas, parse(configHandler, "* "))
assertEquals(Seq(), parse(configHandler, " "))
assertEquals(Seq(6), parse(configHandler, "6:102"))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1e6f022df86..2ca3a6c986d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -943,10 +943,15 @@ object TestUtils extends Logging {
values
}
- def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) {
- val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers))
- producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
- producer.close()
+ def produceMessage(servers: Seq[KafkaServer], topic: String, message: String,
+ deliveryTimeoutMs: Int = 30 * 1000, requestTimeoutMs: Int = 20 * 1000) {
+ val producer = createProducer(TestUtils.getBrokerListStrFromServers(servers),
+ deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs = requestTimeoutMs)
+ try {
+ producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get
+ } finally {
+ producer.close()
+ }
}
def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
@@ -1306,6 +1311,13 @@ object TestUtils extends Logging {
adminClient.alterConfigs(configs)
}
+ def alterTopicConfigs(adminClient: AdminClient, topic: String, topicConfigs: Properties): AlterConfigsResult = {
+ val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
+ val newConfig = new Config(configEntries)
+ val configs = Map(new ConfigResource(ConfigResource.Type.TOPIC, topic) -> newConfig).asJava
+ adminClient.alterConfigs(configs)
+ }
+
/**
* Capture the console output during the execution of the provided function.
*/
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 979190db7b7..3b7ee853454 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -41,6 +41,10 @@
Even though the old Describe Cluster
access is still supported for backward compatibility, using it for this API is not advised.
+
unclean.leader.election.enable
config is dynamically updated by using per-topic config override.Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,