Browse Source

KAFKA-1948; Fix ConsumerTest.testPartitionReassignmentCallback handling issue; reviewed by Gwen Shapira

pull/1442/head
Guozhang Wang 10 years ago
parent
commit
a3d6dcaf1b
  1. 2
      core/src/main/scala/kafka/controller/KafkaController.scala
  2. 5
      core/src/test/scala/integration/kafka/api/ConsumerTest.scala

2
core/src/main/scala/kafka/controller/KafkaController.scala

@ -365,6 +365,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt @@ -365,6 +365,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
controllerContext.epoch=0
controllerContext.epochZkVersion=0
brokerState.newState(RunningAsBroker)
info("Broker %d resigned as the controller".format(config.brokerId))
}
}

5
core/src/test/scala/integration/kafka/api/ConsumerTest.scala

@ -230,7 +230,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -230,7 +230,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertEquals(1, parts.size)
assertNotNull(parts(0).leader())
// shutdown the co-ordinator
// shutdown the coordinator
val coordinator = parts(0).leader().id()
this.servers(coordinator).shutdown()
@ -239,6 +239,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -239,6 +239,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumer0.poll(50)
assertEquals(2, callback.callsToAssigned)
assertEquals(2, callback.callsToRevoked)
// restart the coordinator since it may also be hosting "test" topic
this.servers(coordinator).startup()
consumer0.close()
}

Loading…
Cancel
Save