|
|
|
@ -26,7 +26,6 @@ import java.util.Properties
@@ -26,7 +26,6 @@ import java.util.Properties
|
|
|
|
|
import java.util.concurrent.ExecutionException |
|
|
|
|
|
|
|
|
|
import kafka.admin.AdminUtils |
|
|
|
|
import kafka.common.FailedToSendMessageException |
|
|
|
|
import kafka.consumer.{Consumer, ConsumerConfig} |
|
|
|
|
import kafka.serializer.StringDecoder |
|
|
|
|
import kafka.server.{KafkaConfig, KafkaServer} |
|
|
|
@ -67,9 +66,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -67,9 +66,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|
|
|
|
configProps2 = createBrokerConfig(brokerId2, zkConnect) |
|
|
|
|
|
|
|
|
|
for (configProps <- List(configProps1, configProps2)) { |
|
|
|
|
configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) |
|
|
|
|
configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) |
|
|
|
|
configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) |
|
|
|
|
configProps.put("controlled.shutdown.enable", enableControlledShutdown.toString) |
|
|
|
|
configProps.put("controlled.shutdown.max.retries", "1") |
|
|
|
|
configProps.put("controlled.shutdown.retry.backoff.ms", "1000") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// temporarily set loggers to a higher level so that tests run quietly |
|
|
|
@ -104,7 +103,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -104,7 +103,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
def testUncleanLeaderElectionEnabled { |
|
|
|
|
// unclean leader election is enabled by default |
|
|
|
|
// enable unclean leader election |
|
|
|
|
configProps1.put("unclean.leader.election.enable", "true") |
|
|
|
|
configProps2.put("unclean.leader.election.enable", "true") |
|
|
|
|
startBrokers(Seq(configProps1, configProps2)) |
|
|
|
|
|
|
|
|
|
// create topic with 1 partition, 2 replicas, one on each broker |
|
|
|
@ -115,9 +116,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -115,9 +116,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
def testUncleanLeaderElectionDisabled { |
|
|
|
|
// disable unclean leader election |
|
|
|
|
configProps1.put("unclean.leader.election.enable", String.valueOf(false)) |
|
|
|
|
configProps2.put("unclean.leader.election.enable", String.valueOf(false)) |
|
|
|
|
// unclean leader election is disabled by default |
|
|
|
|
startBrokers(Seq(configProps1, configProps2)) |
|
|
|
|
|
|
|
|
|
// create topic with 1 partition, 2 replicas, one on each broker |
|
|
|
@ -129,13 +128,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -129,13 +128,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|
|
|
|
@Test |
|
|
|
|
def testUncleanLeaderElectionEnabledByTopicOverride { |
|
|
|
|
// disable unclean leader election globally, but enable for our specific test topic |
|
|
|
|
configProps1.put("unclean.leader.election.enable", String.valueOf(false)) |
|
|
|
|
configProps2.put("unclean.leader.election.enable", String.valueOf(false)) |
|
|
|
|
configProps1.put("unclean.leader.election.enable", "false") |
|
|
|
|
configProps2.put("unclean.leader.election.enable", "false") |
|
|
|
|
startBrokers(Seq(configProps1, configProps2)) |
|
|
|
|
|
|
|
|
|
// create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled |
|
|
|
|
val topicProps = new Properties() |
|
|
|
|
topicProps.put("unclean.leader.election.enable", String.valueOf(true)) |
|
|
|
|
topicProps.put("unclean.leader.election.enable", "true") |
|
|
|
|
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), |
|
|
|
|
topicProps) |
|
|
|
|
|
|
|
|
@ -145,13 +144,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@@ -145,13 +144,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|
|
|
|
@Test |
|
|
|
|
def testCleanLeaderElectionDisabledByTopicOverride { |
|
|
|
|
// enable unclean leader election globally, but disable for our specific test topic |
|
|
|
|
configProps1.put("unclean.leader.election.enable", String.valueOf(true)) |
|
|
|
|
configProps2.put("unclean.leader.election.enable", String.valueOf(true)) |
|
|
|
|
configProps1.put("unclean.leader.election.enable", "true") |
|
|
|
|
configProps2.put("unclean.leader.election.enable", "true") |
|
|
|
|
startBrokers(Seq(configProps1, configProps2)) |
|
|
|
|
|
|
|
|
|
// create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled |
|
|
|
|
val topicProps = new Properties() |
|
|
|
|
topicProps.put("unclean.leader.election.enable", String.valueOf(false)) |
|
|
|
|
topicProps.put("unclean.leader.election.enable", "false") |
|
|
|
|
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), |
|
|
|
|
topicProps) |
|
|
|
|
|
|
|
|
|