Browse Source

Controller tests throw several zookeeper errors; patched by Yang Ye; reviewed by Jun Rao; KAFKA-416

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1377161 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
17283a377d
  1. 11
      core/src/main/scala/kafka/api/OffsetResponse.scala
  2. 2
      core/src/main/scala/kafka/api/ProducerRequest.scala
  3. 15
      core/src/main/scala/kafka/api/ProducerResponse.scala
  4. 2
      core/src/main/scala/kafka/api/StopReplicaResponse.scala
  5. 104
      core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
  6. 206
      core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
  7. 42
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

11
core/src/main/scala/kafka/api/OffsetResponse.scala

@ -46,4 +46,15 @@ case class OffsetResponse(versionId: Short, @@ -46,4 +46,15 @@ case class OffsetResponse(versionId: Short,
buffer.putInt(offsets.length)
offsets.foreach(buffer.putLong(_))
}
// need to override case-class equals due to broken java-array equals()
override def equals(other: Any): Boolean = {
other match {
case that: OffsetResponse =>
( versionId == that.versionId &&
errorCode == that.errorCode &&
offsets.toSeq == that.offsets.toSeq)
case _ => false
}
}
}

2
core/src/main/scala/kafka/api/ProducerRequest.scala

@ -83,7 +83,7 @@ case class ProducerRequest( versionId: Short, @@ -83,7 +83,7 @@ case class ProducerRequest( versionId: Short,
}
}
def sizeInBytes: Int = {
def sizeInBytes(): Int = {
var size = 0
//size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4

15
core/src/main/scala/kafka/api/ProducerResponse.scala

@ -40,7 +40,7 @@ object ProducerResponse { @@ -40,7 +40,7 @@ object ProducerResponse {
}
}
case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short],
case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short],
offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
@ -58,4 +58,17 @@ case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array @@ -58,4 +58,17 @@ case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array
buffer.putInt(offsets.length)
offsets.foreach(buffer.putLong(_))
}
// need to override case-class equals due to broken java-array equals()
override def equals(other: Any): Boolean = {
other match {
case that: ProducerResponse =>
( correlationId == that.correlationId &&
versionId == that.versionId &&
errorCode == that.errorCode &&
errors.toSeq == that.errors.toSeq &&
offsets.toSeq == that.offsets.toSeq)
case _ => false
}
}
}

2
core/src/main/scala/kafka/api/StopReplicaResponse.scala

@ -45,7 +45,7 @@ object StopReplicaResponse { @@ -45,7 +45,7 @@ object StopReplicaResponse {
case class StopReplicaResponse(val versionId: Short,
val responseMap: Map[(String, Int), Short],
val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
def sizeInBytes: Int ={
def sizeInBytes(): Int ={
var size = 2 + 2 + 4
for ((key, value) <- responseMap){
size += (2 + key._1.length) + 4 + 2

104
core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala

@ -1,104 +0,0 @@ @@ -1,104 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.server.{KafkaServer, KafkaConfig}
import kafka.api._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import kafka.admin.CreateTopicCommand
import kafka.utils.{ZkUtils, ControllerTestUtils, TestUtils}
class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(4)
val configs = props.map(p => new KafkaConfig(p))
var brokers: Seq[KafkaServer] = null
override def setUp() {
super.setUp()
brokers = configs.map(config => TestUtils.createServer(config))
CreateTopicCommand.createTopic(zkClient, "test1", 1, 4, "0:1:2:3")
CreateTopicCommand.createTopic(zkClient, "test2", 1, 4, "0:1:2:3")
}
override def tearDown() {
brokers.foreach(_.shutdown())
super.tearDown()
}
def testControllerFailOver(){
brokers(0).shutdown()
brokers(1).shutdown()
brokers(3).shutdown()
assertTrue("Controller not elected", TestUtils.waitUntilTrue(() =>
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 != null, zookeeper.tickTime))
var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
assertEquals("Controller should move to broker 2", "2", curController)
brokers(1).startup()
brokers(2).shutdown()
assertTrue("Controller not elected", TestUtils.waitUntilTrue(() =>
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 != null, zookeeper.tickTime))
curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
assertEquals("Controller should move to broker 1", "1", curController)
}
def testControllerCommandSend(){
for(broker <- brokers){
if(broker.kafkaController.isActive){
val leaderAndISRRequest = ControllerTestUtils.createTestLeaderAndISRRequest()
val stopReplicaRequest = ControllerTestUtils.createTestStopReplicaRequest()
val successCount: AtomicInteger = new AtomicInteger(0)
val countDownLatch: CountDownLatch = new CountDownLatch(8)
def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){
val expectedResponse = ControllerTestUtils.createTestLeaderAndISRResponse()
if(response.equals(expectedResponse))
successCount.addAndGet(1)
countDownLatch.countDown()
}
def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){
val expectedResponse = ControllerTestUtils.createTestStopReplicaResponse()
if(response.equals(expectedResponse))
successCount.addAndGet(1)
countDownLatch.countDown()
}
broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
countDownLatch.await()
assertEquals(successCount.get(), 8)
}
}
}
}

206
core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala

@ -0,0 +1,206 @@ @@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.network;
import org.junit._
import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._
import java.nio.ByteBuffer
import kafka.api._
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.cluster.Broker
import kafka.common.ErrorMapping
import collection.mutable._
object RpcDataSerializationTestUtils{
private val topic1 = "test1"
private val topic2 = "test2"
private val leader1 = 0;
private val isr1 = List(0, 1, 2)
private val leader2 = 0;
private val isr2 = List(0, 2, 3)
private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes)))
private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes)))
private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes)))
private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes)))
private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3)
private val topicData1 = new TopicData(topic1, partitionDataArray)
private val topicData2 = new TopicData(topic2, partitionDataArray)
private val topicDataArray = Array(topicData1, topicData2)
private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100))
private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100))
private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2)
private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
def createTestLeaderAndISRRequest() : LeaderAndISRRequest = {
val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1)
val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2)
val map = Map(((topic1, 0), leaderAndISR1),
((topic2, 0), leaderAndISR2))
new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map)
}
def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
((topic2, 0), ErrorMapping.NoError))
new LeaderAndISRResponse(1, responseMap)
}
def createTestStopReplicaRequest() : StopReplicaRequest = {
new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
}
def createTestStopReplicaResponse() : StopReplicaResponse = {
val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
((topic2, 0), ErrorMapping.NoError))
new StopReplicaResponse(1, responseMap)
}
def createTestProducerRequest: ProducerRequest = {
new ProducerRequest(1, "client 1", 0, 1000, topicDataArray)
}
def createTestProducerResponse: ProducerResponse = {
new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0)
}
def createTestFetchRequest: FetchRequest = {
new FetchRequest(offsetInfo = offsetDetailSeq)
}
def createTestFetchResponse: FetchResponse = {
new FetchResponse(1, 1, topicDataArray)
}
def createTestOffsetRequest: OffsetRequest = {
new OffsetRequest(topic1, 1, 1000, 200)
}
def createTestOffsetResponse: OffsetResponse = {
new OffsetResponse(1, Array(1000l, 2000l, 3000l, 4000l))
}
def createTestTopicMetadataRequest: TopicMetadataRequest = {
new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2))
}
def createTestTopicMetadataResponse: TopicMetaDataResponse = {
new TopicMetaDataResponse(1, Seq(topicmetaData1, topicmetaData2))
}
}
class RpcDataSerializationTest extends JUnitSuite {
private val leaderAndISRRequest = RpcDataSerializationTestUtils.createTestLeaderAndISRRequest
private val leaderAndISRResponse = RpcDataSerializationTestUtils.createTestLeaderAndISRResponse
private val stopReplicaRequest = RpcDataSerializationTestUtils.createTestStopReplicaRequest
private val stopReplicaResponse = RpcDataSerializationTestUtils.createTestStopReplicaResponse
private val producerRequest = RpcDataSerializationTestUtils.createTestProducerRequest
private val producerResponse = RpcDataSerializationTestUtils.createTestProducerResponse
private val fetchRequest = RpcDataSerializationTestUtils.createTestFetchRequest
private val offsetRequest = RpcDataSerializationTestUtils.createTestOffsetRequest
private val offsetResponse = RpcDataSerializationTestUtils.createTestOffsetResponse
private val topicMetadataRequest = RpcDataSerializationTestUtils.createTestTopicMetadataRequest
private val topicMetadataResponse = RpcDataSerializationTestUtils.createTestTopicMetadataResponse
@Test
def testSerializationAndDeserialization() {
var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes())
leaderAndISRRequest.writeTo(buffer)
buffer.rewind()
val deserializedLeaderAndISRRequest = LeaderAndISRRequest.readFrom(buffer)
assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest,
deserializedLeaderAndISRRequest)
buffer = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes())
leaderAndISRResponse.writeTo(buffer)
buffer.rewind()
val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(buffer)
assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndISRResponse,
deserializedLeaderAndISRResponse)
buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes())
stopReplicaRequest.writeTo(buffer)
buffer.rewind()
val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(buffer)
assertEquals("The original and deserialzed stopReplicaRequest should be the same", stopReplicaRequest,
deserializedStopReplicaRequest)
buffer = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes())
stopReplicaResponse.writeTo(buffer)
buffer.rewind()
val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(buffer)
assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse,
deserializedStopReplicaResponse)
buffer = ByteBuffer.allocate(producerRequest.sizeInBytes())
producerRequest.writeTo(buffer)
buffer.rewind()
val deserializedProducerRequest = ProducerRequest.readFrom(buffer)
assertEquals("The original and deserialzed producerRequest should be the same", producerRequest,
deserializedProducerRequest)
buffer = ByteBuffer.allocate(producerResponse.sizeInBytes)
producerResponse.writeTo(buffer)
buffer.rewind()
val deserializedProducerResponse = ProducerResponse.readFrom(buffer)
assertEquals("The original and deserialzed producerResponse should be the same: [%s], [%s]".format(producerResponse, deserializedProducerResponse), producerResponse,
deserializedProducerResponse)
buffer = ByteBuffer.allocate(fetchRequest.sizeInBytes)
fetchRequest.writeTo(buffer)
buffer.rewind()
val deserializedFetchRequest = FetchRequest.readFrom(buffer)
assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest,
deserializedFetchRequest)
buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes())
offsetRequest.writeTo(buffer)
buffer.rewind()
val deserializedOffsetRequest = OffsetRequest.readFrom(buffer)
assertEquals("The original and deserialzed offsetRequest should be the same", offsetRequest,
deserializedOffsetRequest)
buffer = ByteBuffer.allocate(offsetResponse.sizeInBytes)
offsetResponse.writeTo(buffer)
buffer.rewind()
val deserializedOffsetResponse = OffsetResponse.readFrom(buffer)
assertEquals("The original and deserialzed offsetResponse should be the same", offsetResponse,
deserializedOffsetResponse)
buffer = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes())
topicMetadataRequest.writeTo(buffer)
buffer.rewind()
val deserializedTopicMetadataRequest = TopicMetadataRequest.readFrom(buffer)
assertEquals("The original and deserialzed topicMetadataRequest should be the same", topicMetadataRequest,
deserializedTopicMetadataRequest)
buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes)
topicMetadataResponse.writeTo(buffer)
buffer.rewind()
val deserializedTopicMetadataResponse = TopicMetaDataResponse.readFrom(buffer)
assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
deserializedTopicMetadataResponse)
}
}

42
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -471,48 +471,6 @@ object TestUtils extends Logging { @@ -471,48 +471,6 @@ object TestUtils extends Logging {
}
}
object ControllerTestUtils{
def createTestLeaderAndISRRequest() : LeaderAndISRRequest = {
val topic1 = "test1"
val topic2 = "test2"
val leader1 = 0;
val isr1 = List(0, 1, 2)
val leader2 = 0;
val isr2 = List(0, 2, 3)
val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1)
val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2)
val map = Map(((topic1, 0), leaderAndISR1),
((topic2, 0), leaderAndISR2))
new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map)
}
def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
val topic1 = "test1"
val topic2 = "test2"
val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
((topic2, 0), ErrorMapping.NoError))
new LeaderAndISRResponse(1, responseMap)
}
def createTestStopReplicaRequest() : StopReplicaRequest = {
val topic1 = "test1"
val topic2 = "test2"
new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
}
def createTestStopReplicaResponse() : StopReplicaResponse = {
val topic1 = "test1"
val topic2 = "test2"
val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
((topic2, 0), ErrorMapping.NoError))
new StopReplicaResponse(1, responseMap)
}
}
object TestZKUtils {
val zookeeperConnect = "127.0.0.1:2182"
}

Loading…
Cancel
Save