diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 66df6d2fbdb..e9b4dc62df3 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -365,6 +365,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt controllerContext.epoch=0 controllerContext.epochZkVersion=0 brokerState.newState(RunningAsBroker) + + info("Broker %d resigned as the controller".format(config.brokerId)) } } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 798f035df52..2802a399bf5 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -230,7 +230,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(1, parts.size) assertNotNull(parts(0).leader()) - // shutdown the co-ordinator + // shutdown the coordinator val coordinator = parts(0).leader().id() this.servers(coordinator).shutdown() @@ -239,6 +239,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.poll(50) assertEquals(2, callback.callsToAssigned) assertEquals(2, callback.callsToRevoked) + + // restart the coordinator since it may also be hosting "test" topic + this.servers(coordinator).startup() consumer0.close() }