From 17283a377dc9b2026b3eecdf1d9a32908b3bfc90 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Sat, 25 Aug 2012 00:24:49 +0000 Subject: [PATCH] Controller tests throw several zookeeper errors; patched by Yang Ye; reviewed by Jun Rao; KAFKA-416 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1377161 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/api/OffsetResponse.scala | 11 + .../scala/kafka/api/ProducerRequest.scala | 2 +- .../scala/kafka/api/ProducerResponse.scala | 15 +- .../scala/kafka/api/StopReplicaResponse.scala | 2 +- .../controller/ControllerBasicTest.scala | 104 --------- .../network/RpcDataSerializationTest.scala | 206 ++++++++++++++++++ .../scala/unit/kafka/utils/TestUtils.scala | 42 ---- 7 files changed, 233 insertions(+), 149 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala create mode 100644 core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index 56d169ce8e0..61d344072d6 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -46,4 +46,15 @@ case class OffsetResponse(versionId: Short, buffer.putInt(offsets.length) offsets.foreach(buffer.putLong(_)) } + + // need to override case-class equals due to broken java-array equals() + override def equals(other: Any): Boolean = { + other match { + case that: OffsetResponse => + ( versionId == that.versionId && + errorCode == that.errorCode && + offsets.toSeq == that.offsets.toSeq) + case _ => false + } + } } diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index a350c6ab12f..d7767b57c5d 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -83,7 +83,7 @@ case class ProducerRequest( versionId: Short, } } - def sizeInBytes: Int = { + def sizeInBytes(): Int = { var size = 0 //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4 diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 06e8625e689..dc110e0104a 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -40,7 +40,7 @@ object ProducerResponse { } } -case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], +case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length) @@ -58,4 +58,17 @@ case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array buffer.putInt(offsets.length) offsets.foreach(buffer.putLong(_)) } + + // need to override case-class equals due to broken java-array equals() + override def equals(other: Any): Boolean = { + other match { + case that: ProducerResponse => + ( correlationId == that.correlationId && + versionId == that.versionId && + errorCode == that.errorCode && + errors.toSeq == that.errors.toSeq && + offsets.toSeq == that.offsets.toSeq) + case _ => false + } + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index 80bed172431..29e5209e714 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -45,7 +45,7 @@ object StopReplicaResponse { case class StopReplicaResponse(val versionId: Short, val responseMap: Map[(String, Int), Short], val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ - def sizeInBytes: Int ={ + def sizeInBytes(): Int ={ var size = 2 + 2 + 4 for ((key, value) <- responseMap){ size += (2 + key._1.length) + 4 + 2 diff --git a/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala deleted file mode 100644 index 40f47b801cc..00000000000 --- a/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.controller - -import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness -import kafka.utils.TestUtils._ -import junit.framework.Assert._ -import kafka.server.{KafkaServer, KafkaConfig} -import kafka.api._ -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicInteger -import kafka.admin.CreateTopicCommand -import kafka.utils.{ZkUtils, ControllerTestUtils, TestUtils} - - -class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness { - val props = createBrokerConfigs(4) - val configs = props.map(p => new KafkaConfig(p)) - var brokers: Seq[KafkaServer] = null - - override def setUp() { - super.setUp() - brokers = configs.map(config => TestUtils.createServer(config)) - CreateTopicCommand.createTopic(zkClient, "test1", 1, 4, "0:1:2:3") - CreateTopicCommand.createTopic(zkClient, "test2", 1, 4, "0:1:2:3") - } - - override def tearDown() { - brokers.foreach(_.shutdown()) - super.tearDown() - } - - def testControllerFailOver(){ - brokers(0).shutdown() - brokers(1).shutdown() - brokers(3).shutdown() - assertTrue("Controller not elected", TestUtils.waitUntilTrue(() => - ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 != null, zookeeper.tickTime)) - var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 - assertEquals("Controller should move to broker 2", "2", curController) - - - brokers(1).startup() - brokers(2).shutdown() - assertTrue("Controller not elected", TestUtils.waitUntilTrue(() => - ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 != null, zookeeper.tickTime)) - curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 - assertEquals("Controller should move to broker 1", "1", curController) - } - - def testControllerCommandSend(){ - for(broker <- brokers){ - if(broker.kafkaController.isActive){ - val leaderAndISRRequest = ControllerTestUtils.createTestLeaderAndISRRequest() - val stopReplicaRequest = ControllerTestUtils.createTestStopReplicaRequest() - - val successCount: AtomicInteger = new AtomicInteger(0) - val countDownLatch: CountDownLatch = new CountDownLatch(8) - - def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){ - val expectedResponse = ControllerTestUtils.createTestLeaderAndISRResponse() - if(response.equals(expectedResponse)) - successCount.addAndGet(1) - countDownLatch.countDown() - } - - def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){ - val expectedResponse = ControllerTestUtils.createTestStopReplicaResponse() - if(response.equals(expectedResponse)) - successCount.addAndGet(1) - countDownLatch.countDown() - } - - broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne) - broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne) - broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne) - broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne) - broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne) - broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne) - broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne) - broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne) - countDownLatch.await() - - assertEquals(successCount.get(), 8) - } - } - } -} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala b/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala new file mode 100644 index 00000000000..16057dc0967 --- /dev/null +++ b/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network; + +import org.junit._ +import org.scalatest.junit.JUnitSuite +import junit.framework.Assert._ +import java.nio.ByteBuffer +import kafka.api._ +import kafka.message.{Message, ByteBufferMessageSet} +import kafka.cluster.Broker +import kafka.common.ErrorMapping +import collection.mutable._ + +object RpcDataSerializationTestUtils{ + private val topic1 = "test1" + private val topic2 = "test2" + private val leader1 = 0; + private val isr1 = List(0, 1, 2) + private val leader2 = 0; + private val isr2 = List(0, 2, 3) + private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes))) + private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes))) + private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes))) + private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) + private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3) + private val topicData1 = new TopicData(topic1, partitionDataArray) + private val topicData2 = new TopicData(topic2, partitionDataArray) + private val topicDataArray = Array(topicData1, topicData2) + private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) + private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) + private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2) + private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) + private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) + private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) + + def createTestLeaderAndISRRequest() : LeaderAndISRRequest = { + val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1) + val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2) + val map = Map(((topic1, 0), leaderAndISR1), + ((topic2, 0), leaderAndISR2)) + new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map) + } + + def createTestLeaderAndISRResponse() : LeaderAndISRResponse = { + val responseMap = Map(((topic1, 0), ErrorMapping.NoError), + ((topic2, 0), ErrorMapping.NoError)) + new LeaderAndISRResponse(1, responseMap) + } + + def createTestStopReplicaRequest() : StopReplicaRequest = { + new StopReplicaRequest(Set((topic1, 0), (topic2, 0))) + } + + def createTestStopReplicaResponse() : StopReplicaResponse = { + val responseMap = Map(((topic1, 0), ErrorMapping.NoError), + ((topic2, 0), ErrorMapping.NoError)) + new StopReplicaResponse(1, responseMap) + } + + def createTestProducerRequest: ProducerRequest = { + new ProducerRequest(1, "client 1", 0, 1000, topicDataArray) + } + + def createTestProducerResponse: ProducerResponse = { + new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0) + } + + def createTestFetchRequest: FetchRequest = { + new FetchRequest(offsetInfo = offsetDetailSeq) + } + + def createTestFetchResponse: FetchResponse = { + new FetchResponse(1, 1, topicDataArray) + } + + def createTestOffsetRequest: OffsetRequest = { + new OffsetRequest(topic1, 1, 1000, 200) + } + + def createTestOffsetResponse: OffsetResponse = { + new OffsetResponse(1, Array(1000l, 2000l, 3000l, 4000l)) + } + + def createTestTopicMetadataRequest: TopicMetadataRequest = { + new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2)) + } + + def createTestTopicMetadataResponse: TopicMetaDataResponse = { + new TopicMetaDataResponse(1, Seq(topicmetaData1, topicmetaData2)) + } +} + +class RpcDataSerializationTest extends JUnitSuite { + private val leaderAndISRRequest = RpcDataSerializationTestUtils.createTestLeaderAndISRRequest + private val leaderAndISRResponse = RpcDataSerializationTestUtils.createTestLeaderAndISRResponse + private val stopReplicaRequest = RpcDataSerializationTestUtils.createTestStopReplicaRequest + private val stopReplicaResponse = RpcDataSerializationTestUtils.createTestStopReplicaResponse + private val producerRequest = RpcDataSerializationTestUtils.createTestProducerRequest + private val producerResponse = RpcDataSerializationTestUtils.createTestProducerResponse + private val fetchRequest = RpcDataSerializationTestUtils.createTestFetchRequest + private val offsetRequest = RpcDataSerializationTestUtils.createTestOffsetRequest + private val offsetResponse = RpcDataSerializationTestUtils.createTestOffsetResponse + private val topicMetadataRequest = RpcDataSerializationTestUtils.createTestTopicMetadataRequest + private val topicMetadataResponse = RpcDataSerializationTestUtils.createTestTopicMetadataResponse + + + @Test + def testSerializationAndDeserialization() { + var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes()) + leaderAndISRRequest.writeTo(buffer) + buffer.rewind() + val deserializedLeaderAndISRRequest = LeaderAndISRRequest.readFrom(buffer) + assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest, + deserializedLeaderAndISRRequest) + + buffer = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes()) + leaderAndISRResponse.writeTo(buffer) + buffer.rewind() + val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(buffer) + assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndISRResponse, + deserializedLeaderAndISRResponse) + + buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes()) + stopReplicaRequest.writeTo(buffer) + buffer.rewind() + val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(buffer) + assertEquals("The original and deserialzed stopReplicaRequest should be the same", stopReplicaRequest, + deserializedStopReplicaRequest) + + buffer = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes()) + stopReplicaResponse.writeTo(buffer) + buffer.rewind() + val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(buffer) + assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse, + deserializedStopReplicaResponse) + + buffer = ByteBuffer.allocate(producerRequest.sizeInBytes()) + producerRequest.writeTo(buffer) + buffer.rewind() + val deserializedProducerRequest = ProducerRequest.readFrom(buffer) + assertEquals("The original and deserialzed producerRequest should be the same", producerRequest, + deserializedProducerRequest) + + buffer = ByteBuffer.allocate(producerResponse.sizeInBytes) + producerResponse.writeTo(buffer) + buffer.rewind() + val deserializedProducerResponse = ProducerResponse.readFrom(buffer) + assertEquals("The original and deserialzed producerResponse should be the same: [%s], [%s]".format(producerResponse, deserializedProducerResponse), producerResponse, + deserializedProducerResponse) + + buffer = ByteBuffer.allocate(fetchRequest.sizeInBytes) + fetchRequest.writeTo(buffer) + buffer.rewind() + val deserializedFetchRequest = FetchRequest.readFrom(buffer) + assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest, + deserializedFetchRequest) + + buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes()) + offsetRequest.writeTo(buffer) + buffer.rewind() + val deserializedOffsetRequest = OffsetRequest.readFrom(buffer) + assertEquals("The original and deserialzed offsetRequest should be the same", offsetRequest, + deserializedOffsetRequest) + + buffer = ByteBuffer.allocate(offsetResponse.sizeInBytes) + offsetResponse.writeTo(buffer) + buffer.rewind() + val deserializedOffsetResponse = OffsetResponse.readFrom(buffer) + assertEquals("The original and deserialzed offsetResponse should be the same", offsetResponse, + deserializedOffsetResponse) + + buffer = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes()) + topicMetadataRequest.writeTo(buffer) + buffer.rewind() + val deserializedTopicMetadataRequest = TopicMetadataRequest.readFrom(buffer) + assertEquals("The original and deserialzed topicMetadataRequest should be the same", topicMetadataRequest, + deserializedTopicMetadataRequest) + + buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes) + topicMetadataResponse.writeTo(buffer) + buffer.rewind() + val deserializedTopicMetadataResponse = TopicMetaDataResponse.readFrom(buffer) + assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse, + deserializedTopicMetadataResponse) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 23bb486fed8..f18b7ab7166 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -471,48 +471,6 @@ object TestUtils extends Logging { } } -object ControllerTestUtils{ - def createTestLeaderAndISRRequest() : LeaderAndISRRequest = { - val topic1 = "test1" - val topic2 = "test2" - - val leader1 = 0; - val isr1 = List(0, 1, 2) - - val leader2 = 0; - val isr2 = List(0, 2, 3) - - val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1) - val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2) - val map = Map(((topic1, 0), leaderAndISR1), - ((topic2, 0), leaderAndISR2)) - new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map) - } - - def createTestLeaderAndISRResponse() : LeaderAndISRResponse = { - val topic1 = "test1" - val topic2 = "test2" - val responseMap = Map(((topic1, 0), ErrorMapping.NoError), - ((topic2, 0), ErrorMapping.NoError)) - new LeaderAndISRResponse(1, responseMap) - } - - - def createTestStopReplicaRequest() : StopReplicaRequest = { - val topic1 = "test1" - val topic2 = "test2" - new StopReplicaRequest(Set((topic1, 0), (topic2, 0))) - } - - def createTestStopReplicaResponse() : StopReplicaResponse = { - val topic1 = "test1" - val topic2 = "test2" - val responseMap = Map(((topic1, 0), ErrorMapping.NoError), - ((topic2, 0), ErrorMapping.NoError)) - new StopReplicaResponse(1, responseMap) - } -} - object TestZKUtils { val zookeeperConnect = "127.0.0.1:2182" }