diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 272a0b6b8a4..37680dc801a 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -25,8 +25,7 @@ import kafka.common.UnknownMagicByteException * Message byte offsets */ object Message { - val MagicVersion1: Byte = 0 - val MagicVersion2: Byte = 1 + val MagicVersion: Byte = 1 val CurrentMagicValue: Byte = 1 val MagicOffset = 0 val MagicLength = 1 @@ -43,13 +42,10 @@ object Message { /** * Computes the CRC value based on the magic byte - * @param magic Specifies the magic byte value. Possible values are 0 and 1 - * 0 for no compression - * 1 for compression - */ + * @param magic Specifies the magic byte value. The only value allowed currently is 1. + */ def crcOffset(magic: Byte): Int = magic match { - case MagicVersion1 => MagicOffset + MagicLength - case MagicVersion2 => AttributeOffset + AttributeLength + case MagicVersion => AttributeOffset + AttributeLength case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic)) } @@ -57,40 +53,26 @@ object Message { /** * Computes the offset to the message payload based on the magic byte - * @param magic Specifies the magic byte value. Possible values are 0 and 1 - * 0 for no compression - * 1 for compression + * @param magic Specifies the magic byte value. The only value allowed currently is 1. */ def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength /** * Computes the size of the message header based on the magic byte - * @param magic Specifies the magic byte value. Possible values are 0 and 1 - * 0 for no compression - * 1 for compression + * @param magic Specifies the magic byte value. The only value allowed currently is 1. */ def headerSize(magic: Byte): Int = payloadOffset(magic) /** - * Size of the header for magic byte 0. This is the minimum size of any message header + * Size of the header for magic byte 1. This is the minimum size of any message header. */ - val MinHeaderSize = headerSize(0); + val MinHeaderSize = headerSize(1); } /** * A message. The format of an N byte message is the following: * - * If magic byte is 0 - * - * 1. 1 byte "magic" identifier to allow format changes - * - * 2. 4 byte CRC32 of the payload - * - * 3. N - 5 byte payload - * - * If magic byte is 1 - * - * 1. 1 byte "magic" identifier to allow format changes + * 1. 1 byte "magic" identifier to allow format changes, whose value is 1 currently * * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) * diff --git a/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka b/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka deleted file mode 100644 index e5002587f96..00000000000 Binary files a/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka and /dev/null differ diff --git a/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala b/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala deleted file mode 100644 index 255fdb60140..00000000000 --- a/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.integration - -import junit.framework.Assert._ -import java.util.Properties - -import kafka.api.{FetchRequestBuilder, OffsetRequest} -import kafka.consumer.SimpleConsumer -import kafka.server.KafkaConfig -import kafka.utils.TestUtils -import org.scalatest.junit.JUnit3Suite -import kafka.admin.CreateTopicCommand - -class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness { - - val topic = "MagicByte0" - val group = "default_group" - val testConsumer = "consumer" - val kafkaProps = new Properties - val host = "localhost" - val port = TestUtils.choosePort - val loader = getClass.getClassLoader - val kafkaLogDir = loader.getResource("test-kafka-logs") - kafkaProps.put("brokerid", "12") - kafkaProps.put("port", port.toString) - kafkaProps.put("log.dir", kafkaLogDir.getPath) - kafkaProps.put("zk.connect", zkConnect.toString) - val configs = List(new KafkaConfig(kafkaProps)) - var simpleConsumer: SimpleConsumer = null - - override def setUp() { - super.setUp() - simpleConsumer = new SimpleConsumer(host, port, 1000000, 64*1024) - } - - override def tearDown() { - simpleConsumer.close - super.tearDown - } - - // test for reading data with magic byte 0 - def testProtocolVersion0() { - CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) - val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1) - var fetchOffset: Long = 0L - var messageCount: Int = 0 - - while(fetchOffset < lastOffset(0)) { - val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, fetchOffset, 10000).build()) - val fetchedMessages = fetched.messageSet(topic, 0) - fetchedMessages.foreach(m => fetchOffset = m.offset) - messageCount += fetchedMessages.size - } - assertEquals(100, messageCount) - } -}