Browse Source

KAFKA-1499; Broker-side compression configuration; reviewed by Joel Koshy

pull/38/merge
Manikumar Reddy 10 years ago committed by Joel Koshy
parent
commit
1c8f89bc73
  1. 23
      core/src/main/scala/kafka/log/Log.scala
  2. 20
      core/src/main/scala/kafka/log/LogConfig.scala
  3. 14
      core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  4. 39
      core/src/main/scala/kafka/message/CompressionCodec.scala
  5. 11
      core/src/main/scala/kafka/server/KafkaConfig.scala
  6. 3
      core/src/main/scala/kafka/server/KafkaServer.scala
  7. 2
      core/src/test/scala/kafka/log/LogConfigTest.scala
  8. 84
      core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
  9. 6
      core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
  10. 28
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

23
core/src/main/scala/kafka/log/Log.scala

@ -32,7 +32,7 @@ import scala.collection.JavaConversions @@ -32,7 +32,7 @@ import scala.collection.JavaConversions
import com.yammer.metrics.core.Gauge
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, -1, -1, false)
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
}
/**
@ -41,10 +41,11 @@ object LogAppendInfo { @@ -41,10 +41,11 @@ object LogAppendInfo {
* @param lastOffset The last offset in the message set
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
* @param codec The codec used in the message set
* @param sourceCodec The source codec used in the message set(coming from producer)
* @param targetCodec The target codec of the message set(after applying broker compression logic)
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
*/
case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
/**
@ -287,7 +288,7 @@ class Log(val dir: File, @@ -287,7 +288,7 @@ class Log(val dir: File,
// assign offsets to the message set
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
try {
validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
validMessages = validMessages.assignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
@ -360,7 +361,7 @@ class Log(val dir: File, @@ -360,7 +361,7 @@ class Log(val dir: File,
var shallowMessageCount = 0
var validBytesCount = 0
var firstOffset, lastOffset = -1L
var codec: CompressionCodec = NoCompressionCodec
var sourceCodec: CompressionCodec = NoCompressionCodec
var monotonic = true
for(messageAndOffset <- messages.shallowIterator) {
// update the first offset if on the first message
@ -388,14 +389,18 @@ class Log(val dir: File, @@ -388,14 +389,18 @@ class Log(val dir: File,
shallowMessageCount += 1
validBytesCount += messageSize
val messageCodec = m.compressionCodec
if(messageCodec != NoCompressionCodec)
codec = messageCodec
sourceCodec = messageCodec
}
LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic)
//Apply if any broker-side compression
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
LogAppendInfo(firstOffset, lastOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}
/**
* Trim any invalid bytes from the end of this message set (if there are any)
* @param messages The message set to trim

20
core/src/main/scala/kafka/log/LogConfig.scala

@ -19,9 +19,11 @@ package kafka.log @@ -19,9 +19,11 @@ package kafka.log
import java.util.Properties
import org.apache.kafka.common.utils.Utils
import scala.collection._
import org.apache.kafka.common.config.ConfigDef
import kafka.common._
import scala.collection.JavaConversions._
import kafka.message.BrokerCompressionCodec
object Defaults {
val SegmentSize = 1024 * 1024
@ -40,6 +42,7 @@ object Defaults { @@ -40,6 +42,7 @@ object Defaults {
val Compact = false
val UncleanLeaderElectionEnable = true
val MinInSyncReplicas = 1
val CompressionType = "producer"
}
/**
@ -59,6 +62,7 @@ object Defaults { @@ -59,6 +62,7 @@ object Defaults {
* @param compact Should old segments in this log be deleted or deduplicated?
* @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled
* @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 (or all) required acks
* @param compressionType compressionType for a given topic
*
*/
case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
@ -76,7 +80,8 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, @@ -76,7 +80,8 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
val compact: Boolean = Defaults.Compact,
val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
val minInSyncReplicas: Int = Defaults.MinInSyncReplicas) {
val minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
val compressionType: String = Defaults.CompressionType) {
def toProps: Properties = {
val props = new Properties()
@ -97,6 +102,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, @@ -97,6 +102,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
props.put(CompressionTypeProp, compressionType)
props
}
@ -125,6 +131,7 @@ object LogConfig { @@ -125,6 +131,7 @@ object LogConfig {
val CleanupPolicyProp = "cleanup.policy"
val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
val CompressionTypeProp = "compression.type"
val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
@ -145,6 +152,10 @@ object LogConfig { @@ -145,6 +152,10 @@ object LogConfig {
val UncleanLeaderElectionEnableDoc = "Indicates whether unclean leader election is enabled"
val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop accepting writes with" +
" -1 (or all) required acks"
val CompressionTypeDoc = "This parameter allows you to specify the compression logic for a given topic. This config" +
" is used to retain/remove/change the compression set by the producer. This config takes the following options: " +
" uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what the producer sets, the broker" +
" writes the message decompressed. producer means the broker attempts to retain whatever is used by the producer"
private val configDef = {
import ConfigDef.Range._
@ -174,6 +185,7 @@ object LogConfig { @@ -174,6 +185,7 @@ object LogConfig {
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
MEDIUM, UncleanLeaderElectionEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc)
.define(CompressionTypeProp, STRING, Defaults.CompressionType, in(seqAsJavaList(BrokerCompressionCodec.brokerCompressionOptions)), MEDIUM, CompressionTypeDoc)
}
def configNames() = {
@ -181,6 +193,7 @@ object LogConfig { @@ -181,6 +193,7 @@ object LogConfig {
configDef.names().toList.sorted
}
/**
* Parse the given properties instance into a LogConfig object
*/
@ -202,7 +215,8 @@ object LogConfig { @@ -202,7 +215,8 @@ object LogConfig {
minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase != Delete,
uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int])
minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int],
compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String].toLowerCase())
}
/**

14
core/src/main/scala/kafka/message/ByteBufferMessageSet.scala

@ -194,16 +194,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi @@ -194,16 +194,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
innerIter.next
}
}
}
}
/**
* Update the offsets for this message set. This method attempts to do an in-place conversion
* if there is no compression, but otherwise recopies the messages
*/
private[kafka] def assignOffsets(offsetCounter: AtomicLong, codec: CompressionCodec): ByteBufferMessageSet = {
if(codec == NoCompressionCodec) {
private[kafka] def assignOffsets(offsetCounter: AtomicLong, sourceCodec: CompressionCodec, targetCodec: CompressionCodec): ByteBufferMessageSet = {
if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// do an in-place conversion
var position = 0
buffer.mark()
@ -217,16 +217,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi @@ -217,16 +217,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
} else {
// messages are compressed, crack open the messageset and recompress with correct offset
val messages = this.internalIterator(isShallow = false).map(_.message)
new ByteBufferMessageSet(compressionCodec = codec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
}
}
/**
* The total number of bytes in this message set, including any partial trailing messages
*/
def sizeInBytes: Int = buffer.limit
/**
* The total number of bytes in this message set not including any partial, trailing messages
*/

39
core/src/main/scala/kafka/message/CompressionCodec.scala

@ -38,29 +38,58 @@ object CompressionCodec { @@ -38,29 +38,58 @@ object CompressionCodec {
}
}
object BrokerCompressionCodec {
val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains((compressionType.toLowerCase()))
def getCompressionCodec(compressionType: String): CompressionCodec = {
compressionType.toLowerCase match {
case UncompressedCodec.name => NoCompressionCodec
case _ => CompressionCodec.getCompressionCodec(compressionType)
}
}
def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec): CompressionCodec = {
if (ProducerCompressionCodec.name.equals(compressionType)) producerCompression
else getCompressionCodec(compressionType)
}
}
sealed trait CompressionCodec { def codec: Int; def name: String }
sealed trait BrokerCompressionCodec { def name: String }
case object DefaultCompressionCodec extends CompressionCodec {
case object DefaultCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = GZIPCompressionCodec.codec
val name = GZIPCompressionCodec.name
}
case object GZIPCompressionCodec extends CompressionCodec {
case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 1
val name = "gzip"
}
case object SnappyCompressionCodec extends CompressionCodec {
case object SnappyCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 2
val name = "snappy"
}
case object LZ4CompressionCodec extends CompressionCodec {
case object LZ4CompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 3
val name = "lz4"
}
case object NoCompressionCodec extends CompressionCodec {
case object NoCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 0
val name = "none"
}
case object UncompressedCodec extends BrokerCompressionCodec {
val name = "uncompressed"
}
case object ProducerCompressionCodec extends BrokerCompressionCodec {
val name = "producer"
}

11
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -21,6 +21,8 @@ import java.util.Properties @@ -21,6 +21,8 @@ import java.util.Properties
import kafka.message.{MessageSet, Message}
import kafka.consumer.ConsumerConfig
import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
import kafka.message.NoCompressionCodec
import kafka.message.BrokerCompressionCodec
/**
* Configuration settings for the kafka server
@ -345,4 +347,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro @@ -345,4 +347,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */
val deleteTopicEnable = props.getBoolean("delete.topic.enable", false)
/**
* This parameter allows you to specify the broker-side compression logic. This config is used to
* retain/remove/change the compression set by the producer. This config takes the following options:
* uncompressed, gzip, snappy, lz4, producer. uncompressed means that regardless of what the producer sets, the broker
* writes the message decompressed. producer means the broker attempts to retain whatever is used by the producer"
*/
val compressionType = props.getString("compression.type", "producer").toLowerCase()
require(BrokerCompressionCodec.isValid(compressionType), "compression.type : "+compressionType + " is not valid." +
" Valid options are "+BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
}

3
core/src/main/scala/kafka/server/KafkaServer.scala

@ -329,7 +329,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -329,7 +329,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
deleteRetentionMs = config.logCleanerDeleteRetentionMs,
fileDeleteDelayMs = config.logDeleteDelayMs,
minCleanableRatio = config.logCleanerMinCleanRatio,
compact = config.logCleanupPolicy.trim.toLowerCase == "compact")
compact = config.logCleanupPolicy.trim.toLowerCase == "compact",
compressionType = config.compressionType)
val defaultProps = defaultLogConfig.toProps
val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
// read the log configurations from zookeeper

2
core/src/test/scala/kafka/log/LogConfigTest.scala

@ -34,6 +34,7 @@ class LogConfigTest extends JUnit3Suite { @@ -34,6 +34,7 @@ class LogConfigTest extends JUnit3Suite {
Assert.assertEquals(4242, config.segmentSize)
Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize, config.maxMessageSize)
Assert.assertEquals("producer", config.compressionType)
}
@Test
@ -50,6 +51,7 @@ class LogConfigTest extends JUnit3Suite { @@ -50,6 +51,7 @@ class LogConfigTest extends JUnit3Suite {
LogConfig.configNames().foreach((name) => {
name match {
case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer","uncompressed","gzip"))
case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete))
case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)

84
core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala

@ -0,0 +1,84 @@ @@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import kafka.utils._
import kafka.message._
import org.scalatest.junit.JUnitSuite
import org.junit._
import org.junit.Assert._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import java.util.{ Collection, ArrayList }
import kafka.server.KafkaConfig
import org.apache.kafka.common.record.CompressionType
import scala.collection.JavaConversions._
@RunWith(value = classOf[Parameterized])
class BrokerCompressionTest(messageCompression: String, brokerCompression: String) extends JUnitSuite {
var logDir: File = null
val time = new MockTime(0)
val logConfig = LogConfig()
@Before
def setUp() {
logDir = TestUtils.tempDir()
}
@After
def tearDown() {
Utils.rm(logDir)
}
/**
* Test broker-side compression configuration
*/
@Test
def testBrokerSideCompression() {
val messageCompressionCode = CompressionCodec.getCompressionCodec(messageCompression)
/*configure broker-side compression */
val log = new Log(logDir, logConfig.copy(compressionType = brokerCompression), recoveryPoint = 0L, time.scheduler, time = time)
/* append two messages */
log.append(new ByteBufferMessageSet(messageCompressionCode, new Message("hello".getBytes), new Message("there".getBytes)))
def readMessage(offset: Int) = log.read(offset, 4096).messageSet.head.message
if (!brokerCompression.equals("producer")) {
val brokerCompressionCode = BrokerCompressionCodec.getCompressionCodec(brokerCompression)
assertEquals("Compression at offset 0 should produce " + brokerCompressionCode.name, brokerCompressionCode, readMessage(0).compressionCodec)
}
else
assertEquals("Compression at offset 0 should produce " + messageCompressionCode.name, messageCompressionCode, readMessage(0).compressionCodec)
}
}
object BrokerCompressionTest {
@Parameters
def parameters: Collection[Array[String]] = {
for (brokerCompression <- BrokerCompressionCodec.brokerCompressionOptions;
messageCompression <- CompressionType.values
) yield Array(messageCompression.name, brokerCompression)
}
}

6
core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala

@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { @@ -147,11 +147,11 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
// check uncompressed offsets
checkOffsets(messages, 0)
var offset = 1234567
checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec), offset)
checkOffsets(messages.assignOffsets(new AtomicLong(offset), NoCompressionCodec , NoCompressionCodec), offset)
// check compressed messages
checkOffsets(compressedMessages, 0)
checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec), offset)
checkOffsets(compressedMessages.assignOffsets(new AtomicLong(offset), DefaultCompressionCodec , DefaultCompressionCodec), offset)
}
/* check that offsets are assigned based on byte offset from the given base offset */

28
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -21,6 +21,8 @@ import org.junit.Test @@ -21,6 +21,8 @@ import org.junit.Test
import junit.framework.Assert._
import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils
import kafka.message.GZIPCompressionCodec
import kafka.message.NoCompressionCodec
class KafkaConfigTest extends JUnit3Suite {
@ -180,6 +182,30 @@ class KafkaConfigTest extends JUnit3Suite { @@ -180,6 +182,30 @@ class KafkaConfigTest extends JUnit3Suite {
assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis )
}
@Test
def testDefaultCompressionType() {
val props = TestUtils.createBrokerConfig(0, 8181)
val serverConfig = new KafkaConfig(props)
assertEquals(serverConfig.compressionType, "producer")
}
@Test
def testValidCompressionType() {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("compression.type", "gzip")
val serverConfig = new KafkaConfig(props)
assertEquals(serverConfig.compressionType, "gzip")
}
@Test
def testInvalidCompressionType() {
val props = TestUtils.createBrokerConfig(0, 8181)
props.put("compression.type", "abc")
intercept[IllegalArgumentException] {
new KafkaConfig(props)
}
}
}

Loading…
Cancel
Save