Browse Source

kafka-1646; Improve consumer read performance for Windows; patched by Honghai Chen; reviewed by Jay Kreps and Jun Rao

pull/71/head
Honghai Chen 10 years ago committed by Jun Rao
parent
commit
ca758252c5
  1. 58
      core/src/main/scala/kafka/log/FileMessageSet.scala
  2. 30
      core/src/main/scala/kafka/log/Log.scala
  3. 2
      core/src/main/scala/kafka/log/LogCleaner.scala
  4. 6
      core/src/main/scala/kafka/log/LogConfig.scala
  5. 4
      core/src/main/scala/kafka/log/LogSegment.scala
  6. 5
      core/src/main/scala/kafka/server/KafkaConfig.scala
  7. 1
      core/src/main/scala/kafka/server/KafkaServer.scala
  8. 10
      core/src/main/scala/kafka/utils/CoreUtils.scala
  9. 55
      core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
  10. 1
      core/src/test/scala/unit/kafka/log/LogConfigTest.scala
  11. 54
      core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
  12. 1
      core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala

58
core/src/main/scala/kafka/log/FileMessageSet.scala

@ -54,7 +54,7 @@ class FileMessageSet private[kafka](@volatile var file: File, @@ -54,7 +54,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
/* if this is not a slice, update the file pointer to the end of the file */
if (!isSlice)
/* set the file position to the last byte in the file */
channel.position(channel.size)
channel.position(math.min(channel.size().toInt, end))
/**
* Create a file message set with no slicing.
@ -66,12 +66,25 @@ class FileMessageSet private[kafka](@volatile var file: File, @@ -66,12 +66,25 @@ class FileMessageSet private[kafka](@volatile var file: File,
* Create a file message set with no slicing
*/
def this(file: File) =
this(file, CoreUtils.openChannel(file, mutable = true))
this(file, FileMessageSet.openChannel(file, mutable = true))
/**
* Create a file message set with no slicing, and with initFileSize and preallocate.
* For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
* with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
* If it's new file and preallocate is true, end will be set to 0. Otherwise set to Int.MaxValue.
*/
def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
this(file,
channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
start = 0,
end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
isSlice = false)
/**
* Create a file message set with mutable option
*/
def this(file: File, mutable: Boolean) = this(file, CoreUtils.openChannel(file, mutable))
def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable))
/**
* Create a slice view of the file message set that begins and ends at the given byte offsets
@ -223,9 +236,17 @@ class FileMessageSet private[kafka](@volatile var file: File, @@ -223,9 +236,17 @@ class FileMessageSet private[kafka](@volatile var file: File,
*/
def close() {
flush()
trim()
channel.close()
}
/**
* Trim file when close or roll to next file
*/
def trim() {
truncateTo(sizeInBytes())
}
/**
* Delete this message set from the filesystem
* @return True iff this message set was deleted.
@ -272,6 +293,37 @@ class FileMessageSet private[kafka](@volatile var file: File, @@ -272,6 +293,37 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
}
object FileMessageSet
{
/**
* Open a channel for the given file
* For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
* with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
* @param file File path
* @param mutable mutable
* @param fileAlreadyExists File already exists or not
* @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
* @param preallocate Pre allocate file or not, gotten from configuration.
*/
def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
if (mutable) {
if (fileAlreadyExists)
new RandomAccessFile(file, "rw").getChannel()
else {
if (preallocate) {
val randomAccessFile = new RandomAccessFile(file, "rw")
randomAccessFile.setLength(initFileSize)
randomAccessFile.getChannel()
}
else
new RandomAccessFile(file, "rw").getChannel()
}
}
else
new FileInputStream(file).getChannel()
}
}
object LogFlushStats extends KafkaMetricsGroup {
val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))

30
core/src/main/scala/kafka/log/Log.scala

@ -78,6 +78,13 @@ class Log(val dir: File, @@ -78,6 +78,13 @@ class Log(val dir: File,
/* last time it was flushed */
private val lastflushedTime = new AtomicLong(time.milliseconds)
def initFileSize() : Int = {
if (config.preallocate)
config.segmentSize
else
0
}
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
loadSegments()
@ -168,7 +175,8 @@ class Log(val dir: File, @@ -168,7 +175,8 @@ class Log(val dir: File,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time)
time = time,
fileAlreadyExists = true)
if(!hasIndex) {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
@ -205,7 +213,10 @@ class Log(val dir: File, @@ -205,7 +213,10 @@ class Log(val dir: File,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time))
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize(),
preallocate = config.preallocate))
} else {
recoverLog()
// reset the index size of the currently active log segment to allow more entries
@ -586,14 +597,20 @@ class Log(val dir: File, @@ -586,14 +597,20 @@ class Log(val dir: File,
segments.lastEntry() match {
case null =>
case entry => entry.getValue.index.trimToValidSize()
case entry => {
entry.getValue.index.trimToValidSize()
entry.getValue.log.trim()
}
}
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time)
time = time,
fileAlreadyExists = false,
initFileSize = initFileSize,
preallocate = config.preallocate)
val prev = addSegment(segment)
if(prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
@ -687,7 +704,10 @@ class Log(val dir: File, @@ -687,7 +704,10 @@ class Log(val dir: File,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time))
time = time,
fileAlreadyExists = false,
initFileSize = initFileSize,
preallocate = config.preallocate))
updateLogEndOffset(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
}

2
core/src/main/scala/kafka/log/LogCleaner.scala

@ -344,7 +344,7 @@ private[log] class Cleaner(val id: Int, @@ -344,7 +344,7 @@ private[log] class Cleaner(val id: Int,
logFile.delete()
val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
indexFile.delete()
val messages = new FileMessageSet(logFile)
val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)

6
core/src/main/scala/kafka/log/LogConfig.scala

@ -43,6 +43,7 @@ object Defaults { @@ -43,6 +43,7 @@ object Defaults {
val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
val CompressionType = kafka.server.Defaults.CompressionType
val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
}
case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) {
@ -64,6 +65,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi @@ -64,6 +65,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@ -95,6 +97,7 @@ object LogConfig { @@ -95,6 +97,7 @@ object LogConfig {
val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
val CompressionTypeProp = "compression.type"
val PreAllocateEnableProp = "preallocate"
val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
@ -118,6 +121,7 @@ object LogConfig { @@ -118,6 +121,7 @@ object LogConfig {
val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " +
"standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
"no compression; and 'producer' which means retain the original compression codec set by the producer."
val PreAllocateEnableDoc ="Should pre allocate file when create new segment?"
private val configDef = {
import ConfigDef.Range._
@ -149,6 +153,8 @@ object LogConfig { @@ -149,6 +153,8 @@ object LogConfig {
MEDIUM, UncleanLeaderElectionEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc)
.define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
.define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable,
MEDIUM, PreAllocateEnableDoc)
}
def apply(): LogConfig = LogConfig(new Properties())

4
core/src/main/scala/kafka/log/LogSegment.scala

@ -52,8 +52,8 @@ class LogSegment(val log: FileMessageSet, @@ -52,8 +52,8 @@ class LogSegment(val log: FileMessageSet,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset)),
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
startOffset,
indexIntervalBytes,

5
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -83,6 +83,7 @@ object Defaults { @@ -83,6 +83,7 @@ object Defaults {
val LogDeleteDelayMs = 60000
val LogFlushSchedulerIntervalMs = Long.MaxValue
val LogFlushOffsetCheckpointIntervalMs = 60000
val LogPreAllocateEnable = false
val NumRecoveryThreadsPerDataDir = 1
val AutoCreateTopicsEnable = true
val MinInSyncReplicas = 1
@ -206,6 +207,7 @@ object KafkaConfig { @@ -206,6 +207,7 @@ object KafkaConfig {
val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
val LogFlushIntervalMsProp = "log.flush.interval.ms"
val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
val LogPreAllocateProp = "log.preallocate"
val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
@ -332,6 +334,7 @@ object KafkaConfig { @@ -332,6 +334,7 @@ object KafkaConfig {
val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk"
val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk"
val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point"
val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)"
@ -466,6 +469,7 @@ object KafkaConfig { @@ -466,6 +469,7 @@ object KafkaConfig {
.define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc)
.define(LogFlushIntervalMsProp, LONG, HIGH, LogFlushIntervalMsDoc, false)
.define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc)
.define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc)
.define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
.define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
@ -609,6 +613,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka @@ -609,6 +613,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
val logPreAllocateEnable: Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)

1
core/src/main/scala/kafka/server/KafkaServer.scala

@ -443,6 +443,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -443,6 +443,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
case KafkaConfig.LogPreAllocateProp => logProps.put(LogConfig.PreAllocateEnableProp, entry.getValue)
case _ => // we just leave those out
}
}

10
core/src/main/scala/kafka/utils/CoreUtils.scala

@ -69,16 +69,6 @@ object CoreUtils extends Logging { @@ -69,16 +69,6 @@ object CoreUtils extends Logging {
def daemonThread(name: String, fun: => Unit): Thread =
Utils.daemonThread(name, runnable(fun))
/**
* Open a channel for the given file
*/
def openChannel(file: File, mutable: Boolean): FileChannel = {
if(mutable)
new RandomAccessFile(file, "rw").getChannel()
else
new FileInputStream(file).getChannel()
}
/**
* Do the given action and log any exceptions thrown without rethrowing them
* @param log The log method to use for logging. E.g. logger.warn

55
core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package kafka.log
import java.io._
import java.nio._
import java.util.concurrent.atomic._
import junit.framework.Assert._
@ -146,5 +147,57 @@ class FileMessageSetTest extends BaseMessageSetTestCases { @@ -146,5 +147,57 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
assertEquals(List(message), messageSet.toList)
assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
}
/**
* Test the new FileMessageSet with pre allocate as true
*/
@Test
def testPreallocateTrue() {
val temp = tempFile()
val set = new FileMessageSet(temp, false, 512 *1024 *1024, true)
val position = set.channel.position
val size = set.sizeInBytes()
assertEquals(0, position)
assertEquals(0, size)
assertEquals(512 *1024 *1024, temp.length)
}
/**
* Test the new FileMessageSet with pre allocate as false
*/
@Test
def testPreallocateFalse() {
val temp = tempFile()
val set = new FileMessageSet(temp, false, 512 *1024 *1024, false)
val position = set.channel.position
val size = set.sizeInBytes()
assertEquals(0, position)
assertEquals(0, size)
assertEquals(0, temp.length)
}
/**
* Test the new FileMessageSet with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data.
*/
@Test
def testPreallocateClearShutdown() {
val temp = tempFile()
val set = new FileMessageSet(temp, false, 512 *1024 *1024, true)
set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
val oldposition = set.channel.position
val oldsize = set.sizeInBytes()
assertEquals(messageSet.sizeInBytes, oldposition)
assertEquals(messageSet.sizeInBytes, oldsize)
set.close()
val tempReopen = new File(temp.getAbsolutePath())
val setReopen = new FileMessageSet(tempReopen, true, 512 *1024 *1024, true)
val position = setReopen.channel.position
val size = setReopen.sizeInBytes()
assertEquals(oldposition, position)
assertEquals(oldposition, size)
assertEquals(oldposition, tempReopen.length)
}
}

1
core/src/test/scala/unit/kafka/log/LogConfigTest.scala

@ -45,6 +45,7 @@ class LogConfigTest extends JUnit3Suite { @@ -45,6 +45,7 @@ class LogConfigTest extends JUnit3Suite {
case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)
case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString)
case LogConfig.PreAllocateEnableProp => expected.setProperty(name, randFrom("true", "false"))
case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
}
})

54
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala

@ -226,5 +226,57 @@ class LogSegmentTest extends JUnit3Suite { @@ -226,5 +226,57 @@ class LogSegmentTest extends JUnit3Suite {
seg.delete()
}
}
/* create a segment with pre allocate */
def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): LogSegment = {
val tempDir = TestUtils.tempDir()
val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
segments += seg
seg
}
/* create a segment with pre allocate, put message to it and verify */
@Test
def testCreateWithInitFileSizeAppendMessage() {
val seg = createSegment(40, false, 512*1024*1024, true)
val ms = messages(50, "hello", "there")
seg.append(50, ms)
val ms2 = messages(60, "alpha", "beta")
seg.append(60, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
assertEquals(ms2.toList, read.messageSet.toList)
}
/* create a segment with pre allocate and clearly shut down*/
@Test
def testCreateWithInitFileSizeClearShutdown() {
val tempDir = TestUtils.tempDir()
val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true)
val ms = messages(50, "hello", "there")
seg.append(50, ms)
val ms2 = messages(60, "alpha", "beta")
seg.append(60, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
assertEquals(ms2.toList, read.messageSet.toList)
val oldSize = seg.log.sizeInBytes()
val oldPosition = seg.log.channel.position
val oldFileSize = seg.log.file.length
assertEquals(512*1024*1024, oldFileSize)
seg.close()
//After close, file should be trimed
assertEquals(oldSize, seg.log.file.length)
val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true, 512*1024*1024, true)
segments += segReopen
val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
assertEquals(ms2.toList, readAgain.messageSet.toList)
val size = segReopen.log.sizeInBytes()
val position = segReopen.log.channel.position
val fileSize = segReopen.log.file.length
assertEquals(oldPosition, position)
assertEquals(oldSize, size)
assertEquals(size, fileSize)
}
}

1
core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala

@ -192,6 +192,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { @@ -192,6 +192,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
case KafkaConfig.MinInSyncReplicasProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.AutoLeaderRebalanceEnableProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.LogPreAllocateProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.InterBrokerSecurityProtocolProp => expected.setProperty(name, SecurityProtocol.PLAINTEXT.toString)
case KafkaConfig.InterBrokerProtocolVersionProp => expected.setProperty(name, ApiVersion.latestVersion.toString)

Loading…
Cancel
Save