Browse Source

remove support for format for magic byte 0 in 0.8; patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-461

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1375367 13f79535-47bb-0310-9956-ffa450edef68
0.8.0-beta1-candidate1
Jun Rao 12 years ago
parent
commit
7baccc6c33
  1. 36
      core/src/main/scala/kafka/message/Message.scala
  2. BIN
      core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka
  3. 73
      core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala

36
core/src/main/scala/kafka/message/Message.scala

@ -25,8 +25,7 @@ import kafka.common.UnknownMagicByteException @@ -25,8 +25,7 @@ import kafka.common.UnknownMagicByteException
* Message byte offsets
*/
object Message {
val MagicVersion1: Byte = 0
val MagicVersion2: Byte = 1
val MagicVersion: Byte = 1
val CurrentMagicValue: Byte = 1
val MagicOffset = 0
val MagicLength = 1
@ -43,13 +42,10 @@ object Message { @@ -43,13 +42,10 @@ object Message {
/**
* Computes the CRC value based on the magic byte
* @param magic Specifies the magic byte value. Possible values are 0 and 1
* 0 for no compression
* 1 for compression
*/
* @param magic Specifies the magic byte value. The only value allowed currently is 1.
*/
def crcOffset(magic: Byte): Int = magic match {
case MagicVersion1 => MagicOffset + MagicLength
case MagicVersion2 => AttributeOffset + AttributeLength
case MagicVersion => AttributeOffset + AttributeLength
case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic))
}
@ -57,40 +53,26 @@ object Message { @@ -57,40 +53,26 @@ object Message {
/**
* Computes the offset to the message payload based on the magic byte
* @param magic Specifies the magic byte value. Possible values are 0 and 1
* 0 for no compression
* 1 for compression
* @param magic Specifies the magic byte value. The only value allowed currently is 1.
*/
def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength
/**
* Computes the size of the message header based on the magic byte
* @param magic Specifies the magic byte value. Possible values are 0 and 1
* 0 for no compression
* 1 for compression
* @param magic Specifies the magic byte value. The only value allowed currently is 1.
*/
def headerSize(magic: Byte): Int = payloadOffset(magic)
/**
* Size of the header for magic byte 0. This is the minimum size of any message header
* Size of the header for magic byte 1. This is the minimum size of any message header.
*/
val MinHeaderSize = headerSize(0);
val MinHeaderSize = headerSize(1);
}
/**
* A message. The format of an N byte message is the following:
*
* If magic byte is 0
*
* 1. 1 byte "magic" identifier to allow format changes
*
* 2. 4 byte CRC32 of the payload
*
* 3. N - 5 byte payload
*
* If magic byte is 1
*
* 1. 1 byte "magic" identifier to allow format changes
* 1. 1 byte "magic" identifier to allow format changes, whose value is 1 currently
*
* 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
*

BIN
core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka

Binary file not shown.

73
core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala

@ -1,73 +0,0 @@ @@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.integration
import junit.framework.Assert._
import java.util.Properties
import kafka.api.{FetchRequestBuilder, OffsetRequest}
import kafka.consumer.SimpleConsumer
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
val topic = "MagicByte0"
val group = "default_group"
val testConsumer = "consumer"
val kafkaProps = new Properties
val host = "localhost"
val port = TestUtils.choosePort
val loader = getClass.getClassLoader
val kafkaLogDir = loader.getResource("test-kafka-logs")
kafkaProps.put("brokerid", "12")
kafkaProps.put("port", port.toString)
kafkaProps.put("log.dir", kafkaLogDir.getPath)
kafkaProps.put("zk.connect", zkConnect.toString)
val configs = List(new KafkaConfig(kafkaProps))
var simpleConsumer: SimpleConsumer = null
override def setUp() {
super.setUp()
simpleConsumer = new SimpleConsumer(host, port, 1000000, 64*1024)
}
override def tearDown() {
simpleConsumer.close
super.tearDown
}
// test for reading data with magic byte 0
def testProtocolVersion0() {
CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
var fetchOffset: Long = 0L
var messageCount: Int = 0
while(fetchOffset < lastOffset(0)) {
val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, fetchOffset, 10000).build())
val fetchedMessages = fetched.messageSet(topic, 0)
fetchedMessages.foreach(m => fetchOffset = m.offset)
messageCount += fetchedMessages.size
}
assertEquals(100, messageCount)
}
}
Loading…
Cancel
Save