diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 024506cd005..86422bfd512 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -32,7 +32,7 @@ import scala.collection.JavaConversions import com.yammer.metrics.core.Gauge object LogAppendInfo { - val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, -1, -1, false) + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, NoCompressionCodec, -1, -1, false) } /** @@ -41,10 +41,11 @@ object LogAppendInfo { * @param lastOffset The last offset in the message set * @param shallowCount The number of shallow messages * @param validBytes The number of valid bytes - * @param codec The codec used in the message set + * @param sourceCodec The source codec used in the message set(coming from producer) + * @param targetCodec The target codec of the message set(after applying broker compression logic) * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ -case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) +case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) /** @@ -287,7 +288,7 @@ class Log(val dir: File, // assign offsets to the message set val offset = new AtomicLong(nextOffsetMetadata.messageOffset) try { - validMessages = validMessages.assignOffsets(offset, appendInfo.codec) + validMessages = validMessages.assignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } @@ -360,7 +361,7 @@ class Log(val dir: File, var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset, lastOffset = -1L - var codec: CompressionCodec = NoCompressionCodec + var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true for(messageAndOffset <- messages.shallowIterator) { // update the first offset if on the first message @@ -388,14 +389,18 @@ class Log(val dir: File, shallowMessageCount += 1 validBytesCount += messageSize - + val messageCodec = m.compressionCodec if(messageCodec != NoCompressionCodec) - codec = messageCodec + sourceCodec = messageCodec } - LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic) + + //Apply if any broker-side compression + val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) + + LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic) } - + /** * Trim any invalid bytes from the end of this message set (if there are any) * @param messages The message set to trim diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index ca7a99e99f6..2338b4410ca 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -19,9 +19,11 @@ package kafka.log import java.util.Properties import org.apache.kafka.common.utils.Utils - import scala.collection._ import org.apache.kafka.common.config.ConfigDef +import kafka.common._ +import scala.collection.JavaConversions._ +import kafka.message.BrokerCompressionCodec object Defaults { val SegmentSize = 1024 * 1024 @@ -40,6 +42,7 @@ object Defaults { val Compact = false val UncleanLeaderElectionEnable = true val MinInSyncReplicas = 1 + val CompressionType = "producer" } /** @@ -59,6 +62,7 @@ object Defaults { * @param compact Should old segments in this log be deleted or deduplicated? * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks + * @param compressionType compressionType for a given topic * */ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, @@ -76,7 +80,8 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, val compact: Boolean = Defaults.Compact, val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, - val minInSyncReplicas: Int = Defaults.MinInSyncReplicas) { + val minInSyncReplicas: Int = Defaults.MinInSyncReplicas, + val compressionType: String = Defaults.CompressionType) { def toProps: Properties = { val props = new Properties() @@ -97,6 +102,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(CleanupPolicyProp, if(compact) "compact" else "delete") props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) + props.put(CompressionTypeProp, compressionType) props } @@ -125,6 +131,7 @@ object LogConfig { val CleanupPolicyProp = "cleanup.policy" val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" val MinInSyncReplicasProp = "min.insync.replicas" + val CompressionTypeProp = "compression.type" val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log" val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled" @@ -145,6 +152,10 @@ object LogConfig { val UncleanLeaderElectionEnableDoc = "Indicates whether unclean leader election is enabled" val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop accepting writes with" + " -1 (or all) required acks" + val CompressionTypeDoc = "This parameter allows you to specify the compression logic for a given topic. This config" + + " is used to retain/remove/change the compression set by the producer. This config takes the following options: " + + " uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what the producer sets, the broker" + + " writes the message decompressed. producer means the broker attempts to retain whatever is used by the producer" private val configDef = { import ConfigDef.Range._ @@ -174,6 +185,7 @@ object LogConfig { .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, MEDIUM, UncleanLeaderElectionEnableDoc) .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc) + .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(seqAsJavaList(BrokerCompressionCodec.brokerCompressionOptions)), MEDIUM, CompressionTypeDoc) } def configNames() = { @@ -181,6 +193,7 @@ object LogConfig { configDef.names().toList.sorted } + /** * Parse the given properties instance into a LogConfig object */ @@ -202,7 +215,8 @@ object LogConfig { minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double], compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete, uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], - minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int]) + minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int], + compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase()) } /** diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 788c7864bc8..f46ad5cbbba 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -194,16 +194,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi innerIter.next } } - + } } - + /** * Update the offsets for this message set. This method attempts to do an in-place conversion * if there is no compression, but otherwise recopies the messages */ - private[kafka] def assignOffsets(offsetCounter: AtomicLong, codec: CompressionCodec): ByteBufferMessageSet = { - if(codec == NoCompressionCodec) { + private[kafka] def assignOffsets(offsetCounter: AtomicLong, sourceCodec: CompressionCodec, targetCodec: CompressionCodec): ByteBufferMessageSet = { + if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // do an in-place conversion var position = 0 buffer.mark() @@ -217,16 +217,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi } else { // messages are compressed, crack open the messageset and recompress with correct offset val messages = this.internalIterator(isShallow = false).map(_.message) - new ByteBufferMessageSet(compressionCodec = codec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*) + new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*) } } - + /** * The total number of bytes in this message set, including any partial trailing messages */ def sizeInBytes: Int = buffer.limit - + /** * The total number of bytes in this message set not including any partial, trailing messages */ diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index 9439d2bc29a..cacde9bca0e 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -38,29 +38,58 @@ object CompressionCodec { } } +object BrokerCompressionCodec { + + val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec) + val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name) + + def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains((compressionType.toLowerCase())) + + def getCompressionCodec(compressionType: String): CompressionCodec = { + compressionType.toLowerCase match { + case UncompressedCodec.name => NoCompressionCodec + case _ => CompressionCodec.getCompressionCodec(compressionType) + } + } + + def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec): CompressionCodec = { + if (ProducerCompressionCodec.name.equals(compressionType)) producerCompression + else getCompressionCodec(compressionType) + } +} + sealed trait CompressionCodec { def codec: Int; def name: String } +sealed trait BrokerCompressionCodec { def name: String } -case object DefaultCompressionCodec extends CompressionCodec { +case object DefaultCompressionCodec extends CompressionCodec with BrokerCompressionCodec { val codec = GZIPCompressionCodec.codec val name = GZIPCompressionCodec.name } -case object GZIPCompressionCodec extends CompressionCodec { +case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec { val codec = 1 val name = "gzip" } -case object SnappyCompressionCodec extends CompressionCodec { +case object SnappyCompressionCodec extends CompressionCodec with BrokerCompressionCodec { val codec = 2 val name = "snappy" } -case object LZ4CompressionCodec extends CompressionCodec { +case object LZ4CompressionCodec extends CompressionCodec with BrokerCompressionCodec { val codec = 3 val name = "lz4" } -case object NoCompressionCodec extends CompressionCodec { +case object NoCompressionCodec extends CompressionCodec with BrokerCompressionCodec { val codec = 0 val name = "none" } + +case object UncompressedCodec extends BrokerCompressionCodec { + val name = "uncompressed" +} + +case object ProducerCompressionCodec extends BrokerCompressionCodec { + val name = "producer" +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index bbd3fd75e83..9d1adec06a4 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -21,6 +21,8 @@ import java.util.Properties import kafka.message.{MessageSet, Message} import kafka.consumer.ConsumerConfig import kafka.utils.{VerifiableProperties, ZKConfig, Utils} +import kafka.message.NoCompressionCodec +import kafka.message.BrokerCompressionCodec /** * Configuration settings for the kafka server @@ -345,4 +347,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */ val deleteTopicEnable = props.getBoolean("delete.topic.enable", false) + /** + * This parameter allows you to specify the broker-side compression logic. This config is used to + * retain/remove/change the compression set by the producer. This config takes the following options: + * uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what the producer sets, the broker + * writes the message decompressed. producer means the broker attempts to retain whatever is used by the producer" + */ + val compressionType = props.getString("compression.type", "producer").toLowerCase() + require(BrokerCompressionCodec.isValid(compressionType), "compression.type : "+compressionType + " is not valid." + + " Valid options are "+BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a069eb9272c..89200da30a0 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -329,7 +329,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg deleteRetentionMs = config.logCleanerDeleteRetentionMs, fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, - compact = config.logCleanupPolicy.trim.toLowerCase == "compact") + compact = config.logCleanupPolicy.trim.toLowerCase == "compact", + compressionType = config.compressionType) val defaultProps = defaultLogConfig.toProps val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala index 99b0df7b69c..fe5bd9d9155 100644 --- a/core/src/test/scala/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/kafka/log/LogConfigTest.scala @@ -34,6 +34,7 @@ class LogConfigTest extends JUnit3Suite { Assert.assertEquals(4242, config.segmentSize) Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize, config.maxMessageSize) + Assert.assertEquals("producer", config.compressionType) } @Test @@ -50,6 +51,7 @@ class LogConfigTest extends JUnit3Suite { LogConfig.configNames().foreach((name) => { name match { case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false")) + case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer","uncompressed","gzip")) case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete)) case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString) diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala new file mode 100644 index 00000000000..fa4a8ad0acc --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.File +import kafka.utils._ +import kafka.message._ +import org.scalatest.junit.JUnitSuite +import org.junit._ +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import java.util.{ Collection, ArrayList } +import kafka.server.KafkaConfig +import org.apache.kafka.common.record.CompressionType +import scala.collection.JavaConversions._ + +@RunWith(value = classOf[Parameterized]) +class BrokerCompressionTest(messageCompression: String, brokerCompression: String) extends JUnitSuite { + + var logDir: File = null + val time = new MockTime(0) + val logConfig = LogConfig() + + @Before + def setUp() { + logDir = TestUtils.tempDir() + } + + @After + def tearDown() { + Utils.rm(logDir) + } + + /** + * Test broker-side compression configuration + */ + @Test + def testBrokerSideCompression() { + val messageCompressionCode = CompressionCodec.getCompressionCodec(messageCompression) + + /*configure broker-side compression */ + val log = new Log(logDir, logConfig.copy(compressionType = brokerCompression), recoveryPoint = 0L, time.scheduler, time = time) + + /* append two messages */ + log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes), new Message("there".getBytes))) + + def readMessage(offset: Int) = log.read(offset, 4096).messageSet.head.message + + if (!brokerCompression.equals("producer")) { + val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression) + assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name, brokerCompressionCode, readMessage(0).compressionCodec) + } + else + assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode, readMessage(0).compressionCodec) + + } + +} + +object BrokerCompressionTest { + @Parameters + def parameters: Collection[Array[String]] = { + for (brokerCompression <- BrokerCompressionCodec.brokerCompressionOptions; + messageCompression <- CompressionType.values + ) yield Array(messageCompression.name, brokerCompression) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index 4e45d965bc4..716254aa338 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { // check uncompressed offsets checkOffsets(messages, 0) var offset = 1234567 - checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec), offset) - + checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec , NoCompressionCodec), offset) + // check compressed messages checkOffsets(compressedMessages, 0) - checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec), offset) + checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec , DefaultCompressionCodec), offset) } /* check that offsets are assigned based on byte offset from the given base offset */ diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2377abe4933..82dce80d553 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -21,6 +21,8 @@ import org.junit.Test import junit.framework.Assert._ import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils +import kafka.message.GZIPCompressionCodec +import kafka.message.NoCompressionCodec class KafkaConfigTest extends JUnit3Suite { @@ -180,6 +182,30 @@ class KafkaConfigTest extends JUnit3Suite { assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis ) } - + @Test + def testDefaultCompressionType() { + val props = TestUtils.createBrokerConfig(0, 8181) + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.compressionType, "producer") + } + + @Test + def testValidCompressionType() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("compression.type", "gzip") + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.compressionType, "gzip") + } + + @Test + def testInvalidCompressionType() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("compression.type", "abc") + intercept[IllegalArgumentException] { + new KafkaConfig(props) + } + } }