From fb21209b5ad30001eeace56b3c8ab060e0ceb021 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Wed, 2 Aug 2017 16:05:26 +0100 Subject: [PATCH] KAFKA-5663; Pass non-null logDirFailureChannel to Log.apply Also: - Improve logging - Remove dangerous default arguments in Log.apply - Improve naming of methods and fields in LogDirFailureChannel - Some clean-ups Author: Dong Lin Reviewers: Jiangjie (Becket) Qin , Apurva Mehta , Ismael Juma Closes #3594 from lindong28/KAFKA-5663 --- core/src/main/scala/kafka/log/Log.scala | 12 +- .../src/main/scala/kafka/log/LogCleaner.scala | 4 +- core/src/main/scala/kafka/log/LogConfig.scala | 3 +- .../src/main/scala/kafka/log/LogManager.scala | 43 +- .../kafka/server/LogDirFailureChannel.scala | 28 +- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../server/checkpoints/CheckpointFile.scala | 10 +- .../kafka/api/LogDirFailureTest.scala | 34 +- .../scala/other/kafka/StressTestLog.scala | 7 +- .../other/kafka/TestLinearWriteSpeed.scala | 27 +- .../unit/kafka/cluster/ReplicaTest.scala | 11 +- .../AbstractLogCleanerIntegrationTest.scala | 5 +- .../kafka/log/BrokerCompressionTest.scala | 6 +- .../kafka/log/LogCleanerManagerTest.scala | 11 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 6 +- .../test/scala/unit/kafka/log/LogTest.scala | 794 +++++++----------- 16 files changed, 441 insertions(+), 562 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0610e87f51e..60ec7a09ba6 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1600,7 +1600,7 @@ class Log(@volatile var dir: File, fun } catch { case e: IOException => - logDirFailureChannel.maybeAddLogFailureEvent(dir.getParent) + logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e) throw new KafkaStorageException(msg, e) } } @@ -1649,14 +1649,14 @@ object Log { def apply(dir: File, config: LogConfig, - logStartOffset: Long = 0L, - recoveryPoint: Long = 0L, + logStartOffset: Long, + recoveryPoint: Long, scheduler: Scheduler, brokerTopicStats: BrokerTopicStats, time: Time = Time.SYSTEM, - maxProducerIdExpirationMs: Int = 60 * 60 * 1000, - producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000, - logDirFailureChannel: LogDirFailureChannel = null): Log = { + maxProducerIdExpirationMs: Int, + producerIdExpirationCheckIntervalMs: Int, + logDirFailureChannel: LogDirFailureChannel): Log = { val topicPartition = Log.parseTopicPartitionName(dir) val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 27da43b8680..85d6487cfe9 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -265,8 +265,8 @@ class LogCleaner(val config: CleanerConfig, } catch { case _: LogCleaningAbortedException => // task can be aborted, let it go. case e: IOException => - error(s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException", e) - logDirFailureChannel.maybeAddLogFailureEvent(cleanable.log.dir.getParent) + val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException" + logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e) } finally { cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) } diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index ad50aabe156..8f82e65a751 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -47,6 +47,7 @@ object Defaults { val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio val Compact = kafka.server.Defaults.LogCleanupPolicy + val CleanupPolicy = kafka.server.Defaults.LogCleanupPolicy val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas val CompressionType = kafka.server.Defaults.CompressionType @@ -235,7 +236,7 @@ object LogConfig { KafkaConfig.LogDeleteDelayMsProp) .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM, MinCleanableRatioDoc, KafkaConfig.LogCleanerMinCleanRatioProp) - .define(CleanupPolicyProp, LIST, Defaults.Compact, ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc, + .define(CleanupPolicyProp, LIST, Defaults.CleanupPolicy, ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc, KafkaConfig.LogCleanupPolicyProp) .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, MEDIUM, UncleanLeaderElectionEnableDoc, KafkaConfig.UncleanLeaderElectionEnableProp) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index f459cc10846..88a0e21e69f 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -194,8 +194,7 @@ class LogManager(logDirs: Array[File], Some(lock) } catch { case e: IOException => - error(s"Disk error while locking directory $dir", e) - logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath) + logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e) None } } @@ -214,9 +213,11 @@ class LogManager(logDirs: Array[File], logStartOffset = logStartOffset, recoveryPoint = logRecoveryPoint, maxProducerIdExpirationMs = maxPidExpirationMs, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, scheduler = scheduler, time = time, - brokerTopicStats = brokerTopicStats) + brokerTopicStats = brokerTopicStats, + logDirFailureChannel = logDirFailureChannel) if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { this.logsToBeDeleted.add(current) @@ -237,7 +238,7 @@ class LogManager(logDirs: Array[File], info("Loading logs.") val startMs = time.milliseconds val threadPools = ArrayBuffer.empty[ExecutorService] - val offlineDirs = ArrayBuffer.empty[String] + val offlineDirs = ArrayBuffer.empty[(String, IOException)] val jobs = mutable.Map.empty[File, Seq[Future[_]]] for (dir <- liveLogDirs) { @@ -283,7 +284,7 @@ class LogManager(logDirs: Array[File], loadLogs(logDir, recoveryPoints, logStartOffsets) } catch { case e: IOException => - offlineDirs.append(dir.getAbsolutePath) + offlineDirs.append((dir.getAbsolutePath, e)) error("Error while loading log dir " + dir.getAbsolutePath, e) } } @@ -291,7 +292,7 @@ class LogManager(logDirs: Array[File], jobs(cleanShutdownFile) = jobsForDir.map(pool.submit) } catch { case e: IOException => - offlineDirs.append(dir.getAbsolutePath) + offlineDirs.append((dir.getAbsolutePath, e)) error("Error while loading log dir " + dir.getAbsolutePath, e) } } @@ -303,11 +304,13 @@ class LogManager(logDirs: Array[File], cleanShutdownFile.delete() } catch { case e: IOException => - offlineDirs.append(cleanShutdownFile.getParent) + offlineDirs.append((cleanShutdownFile.getParent, e)) error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e) } } - offlineDirs.foreach(logDirFailureChannel.maybeAddLogFailureEvent) + offlineDirs.foreach { case (dir, e) => + logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e) + } } catch { case e: ExecutionException => { error("There was an error in one of the threads during logs loading: " + e.getCause) @@ -500,8 +503,7 @@ class LogManager(logDirs: Array[File], this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint))) } catch { case e: IOException => - error(s"Disk error while writing to recovery point file in directory $dir", e) - logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath) + logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point file in directory $dir", e) } } } @@ -518,8 +520,7 @@ class LogManager(logDirs: Array[File], )) } catch { case e: IOException => - error(s"Disk error while writing to logStartOffset file in directory $dir", e) - logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath) + logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e) } } } @@ -555,9 +556,12 @@ class LogManager(logDirs: Array[File], logStartOffset = 0L, recoveryPoint = 0L, maxProducerIdExpirationMs = maxPidExpirationMs, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, scheduler = scheduler, time = time, - brokerTopicStats = brokerTopicStats) + brokerTopicStats = brokerTopicStats, + logDirFailureChannel = logDirFailureChannel) + logs.put(topicPartition, log) info("Created log for partition [%s,%d] in %s with properties {%s}." @@ -568,8 +572,9 @@ class LogManager(logDirs: Array[File], log } catch { case e: IOException => - logDirFailureChannel.maybeAddLogFailureEvent(dataDir.getAbsolutePath) - throw new KafkaStorageException(s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}", e) + val msg = s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}" + logDirFailureChannel.maybeAddOfflineLogDir(dataDir.getAbsolutePath, msg, e) + throw new KafkaStorageException(msg, e) } } } @@ -635,8 +640,9 @@ class LogManager(logDirs: Array[File], } } catch { case e: IOException => - logDirFailureChannel.maybeAddLogFailureEvent(removedLog.dir.getParent) - throw new KafkaStorageException(s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}.", e) + val msg = s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}." + logDirFailureChannel.maybeAddOfflineLogDir(removedLog.dir.getParent, msg, e) + throw new KafkaStorageException(msg, e) } } else if (offlineLogDirs.nonEmpty) { throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(",")) @@ -727,6 +733,9 @@ class LogManager(logDirs: Array[File], } object LogManager { + + val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000 + def apply(config: KafkaConfig, initialOfflineDirs: Seq[String], zkUtils: ZkUtils, diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala index 23d99860528..c78f04ecf82 100644 --- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala +++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala @@ -18,29 +18,35 @@ package kafka.server +import java.io.IOException import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} +import kafka.utils.Logging + /* - * LogDirFailureChannel allows an external thread to block waiting for new offline log dir. + * LogDirFailureChannel allows an external thread to block waiting for new offline log dirs. * - * LogDirFailureChannel should be a singleton object which can be accessed by any class that does disk-IO operation. - * If IOException is encountered while accessing a log directory, the corresponding class can insert the the log directory name - * to the LogDirFailureChannel using maybeAddLogFailureEvent(). Then a thread which is blocked waiting for new offline log directories - * can take the name of the new offline log directory out of the LogDirFailureChannel and handles the log failure properly. + * There should be a single instance of LogDirFailureChannel accessible by any class that does disk-IO operation. + * If IOException is encountered while accessing a log directory, the corresponding class can add the log directory name + * to the LogDirFailureChannel using maybeAddOfflineLogDir(). Each log directory will be added only once. After a log + * directory is added for the first time, a thread which is blocked waiting for new offline log directories + * can take the name of the new offline log directory out of the LogDirFailureChannel and handle the log failure properly. + * An offline log directory will stay offline until the broker is restarted. * */ -class LogDirFailureChannel(logDirNum: Int) { +class LogDirFailureChannel(logDirNum: Int) extends Logging { private val offlineLogDirs = new ConcurrentHashMap[String, String] - private val logDirFailureEvent = new ArrayBlockingQueue[String](logDirNum) + private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum) /* * If the given logDir is not already offline, add it to the * set of offline log dirs and enqueue it to the logDirFailureEvent queue */ - def maybeAddLogFailureEvent(logDir: String): Unit = { + def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = { + error(msg, e) if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) { - logDirFailureEvent.add(logDir) + offlineLogDirQueue.add(logDir) } } @@ -48,8 +54,6 @@ class LogDirFailureChannel(logDirNum: Int) { * Get the next offline log dir from logDirFailureEvent queue. * The method will wait if necessary until a new offline log directory becomes available */ - def takeNextLogFailureEvent(): String = { - logDirFailureEvent.take() - } + def takeNextOfflineLogDir(): String = offlineLogDirQueue.take() } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b66aba0f4ce..11e53447f8a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -189,7 +189,7 @@ class ReplicaManager(val config: KafkaConfig, private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) { override def doWork() { - val newOfflineLogDir = logDirFailureChannel.takeNextLogFailureEvent() + val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir() if (haltBrokerOnDirFailure) { fatal(s"Halting broker because dir $newOfflineLogDir is offline") Exit.halt(1) diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala index 7b67559d2d2..4c1011f9429 100644 --- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala @@ -72,8 +72,9 @@ class CheckpointFile[T](val file: File, Utils.atomicMoveWithFallback(tempPath, path) } catch { case e: IOException => - logDirFailureChannel.maybeAddLogFailureEvent(logDir) - throw new KafkaStorageException(s"Error while writing to checkpoint file ${file.getAbsolutePath}", e) + val msg = s"Error while writing to checkpoint file ${file.getAbsolutePath}" + logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) + throw new KafkaStorageException(msg, e) } } } @@ -119,8 +120,9 @@ class CheckpointFile[T](val file: File, } } catch { case e: IOException => - logDirFailureChannel.maybeAddLogFailureEvent(logDir) - throw new KafkaStorageException(s"Error while reading checkpoint file ${file.getAbsolutePath}", e) + val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}" + logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e) + throw new KafkaStorageException(msg, e) } } } diff --git a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala index 04be8fd78e2..6749a573f38 100644 --- a/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala +++ b/core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala @@ -34,6 +34,9 @@ import org.junit.Assert.assertTrue * Test whether clients can producer and consume when there is log directory failure */ class LogDirFailureTest extends IntegrationTestHarness { + + import kafka.api.LogDirFailureTest._ + val producerCount: Int = 1 val consumerCount: Int = 1 val serverCount: Int = 2 @@ -42,7 +45,8 @@ class LogDirFailureTest extends IntegrationTestHarness { this.logDirCount = 2 this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0") this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") - this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "100") + this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "60000") + @Before override def setUp() { @@ -51,8 +55,16 @@ class LogDirFailureTest extends IntegrationTestHarness { } @Test - def testProduceAfterLogDirFailure() { + def testIOExceptionDuringLogRoll() { + testProduceAfterLogDirFailure(Roll) + } + + @Test + def testIOExceptionDuringCheckpoint() { + testProduceAfterLogDirFailure(Checkpoint) + } + def testProduceAfterLogDirFailure(failureType: LogDirFailureType) { val consumer = consumers.head subscribeAndWaitForAssignment(topic, consumer) val producer = producers.head @@ -75,6 +87,17 @@ class LogDirFailureTest extends IntegrationTestHarness { logDir.createNewFile() assertTrue(logDir.isFile) + if (failureType == Roll) { + try { + leaderServer.replicaManager.getLog(partition).get.roll() + fail("Log rolling should fail with KafkaStorageException") + } catch { + case e: KafkaStorageException => // This is expected + } + } else if (failureType == Checkpoint) { + leaderServer.replicaManager.checkpointHighWatermarks() + } + // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline TestUtils.waitUntilTrue(() => !leaderServer.logManager.liveLogDirs.contains(logDir), "Expected log directory offline", 3000L) assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty) @@ -123,3 +146,10 @@ class LogDirFailureTest extends IntegrationTestHarness { } } + +object LogDirFailureTest { + sealed trait LogDirFailureType + case object Roll extends LogDirFailureType + case object Checkpoint extends LogDirFailureType +} + diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 5355ca2222f..1710da7b9c3 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -21,7 +21,7 @@ import java.util.Properties import java.util.concurrent.atomic._ import kafka.log._ -import kafka.server.BrokerTopicStats +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils._ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.common.record.FileRecords @@ -49,7 +49,10 @@ object StressTestLog { recoveryPoint = 0L, scheduler = time.scheduler, time = time, - brokerTopicStats = new BrokerTopicStats) + maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + brokerTopicStats = new BrokerTopicStats, + logDirFailureChannel = new LogDirFailureChannel(10)) val writer = new WriterThread(log) writer.start() val reader = new ReaderThread(log) diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index f211c4c0884..e05f29d533b 100755 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -25,7 +25,7 @@ import java.util.{Properties, Random} import joptsimple._ import kafka.log._ import kafka.message._ -import kafka.server.BrokerTopicStats +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils._ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{Time, Utils} @@ -85,9 +85,9 @@ object TestLinearWriteSpeed { val mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.") val channelOpt = parser.accepts("channel", "Do writes to file channels.") val logOpt = parser.accepts("log", "Do writes to kafka logs.") - + val options = parser.parse(args : _*) - + CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt, filesOpt) var bytesToWrite = options.valueOf(bytesOpt).longValue @@ -125,14 +125,14 @@ object TestLinearWriteSpeed { logProperties.put(LogConfig.FlushMessagesProp, flushInterval: java.lang.Long) writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet) } else { - System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") + System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") Exit.exit(1) } } bytesToWrite = (bytesToWrite / numFiles) * numFiles - + println("%10s\t%10s\t%10s".format("mb_sec", "avg_latency", "max_latency")) - + val beginTest = System.nanoTime var maxLatency = 0L var totalLatency = 0L @@ -170,12 +170,12 @@ object TestLinearWriteSpeed { println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec") scheduler.shutdown() } - + trait Writable { def write(): Int def close() } - + class MmapWritable(val file: File, size: Long, val content: ByteBuffer) extends Writable { file.deleteOnExit() val raf = new RandomAccessFile(file, "rw") @@ -190,7 +190,7 @@ object TestLinearWriteSpeed { raf.close() } } - + class ChannelWritable(val file: File, val content: ByteBuffer) extends Writable { file.deleteOnExit() val raf = new RandomAccessFile(file, "rw") @@ -204,10 +204,11 @@ object TestLinearWriteSpeed { raf.close() } } - + class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable { Utils.delete(dir) - val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM) + val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM, 60 * 60 * 1000, + LogManager.ProducerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10)) def write(): Int = { log.appendAsLeader(messages, leaderEpoch = 0) messages.sizeInBytes @@ -217,5 +218,5 @@ object TestLinearWriteSpeed { Utils.delete(log.dir) } } - + } diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala index 839b9d9d232..b3d44682a75 100644 --- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala @@ -18,8 +18,8 @@ package kafka.cluster import java.util.Properties -import kafka.log.{Log, LogConfig} -import kafka.server.{BrokerTopicStats, LogOffsetMetadata} +import kafka.log.{Log, LogConfig, LogManager} +import kafka.server.{BrokerTopicStats, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.OffsetOutOfRangeException @@ -49,7 +49,10 @@ class ReplicaTest { recoveryPoint = 0L, scheduler = time.scheduler, brokerTopicStats = brokerTopicStats, - time = time) + time = time, + maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + logDirFailureChannel = new LogDirFailureChannel(10)) replica = new Replica(brokerId = 0, topicPartition = new TopicPartition("foo", 0), @@ -108,7 +111,7 @@ class ReplicaTest { assertTrue(replica.logStartOffset <= hw) // verify that all segments up to the high watermark have been deleted - + log.logSegments.headOption.foreach { segment => assertTrue(segment.baseOffset <= hw) assertTrue(segment.baseOffset >= replica.logStartOffset) diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index d6f0a5616fe..34baf89ede0 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -97,7 +97,10 @@ abstract class AbstractLogCleanerIntegrationTest { recoveryPoint = 0L, scheduler = time.scheduler, time = time, - brokerTopicStats = new BrokerTopicStats) + brokerTopicStats = new BrokerTopicStats, + maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + logDirFailureChannel = new LogDirFailureChannel(10)) logMap.put(partition, log) this.logs += log } diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 9c727c6c61e..1cf393e82aa 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec import org.apache.kafka.common.utils.Utils import java.util.{Collection, Properties} -import kafka.server.BrokerTopicStats +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import scala.collection.JavaConverters._ @@ -56,7 +56,9 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin logProps.put(LogConfig.CompressionTypeProp, brokerCompression) /*configure broker-side compression */ val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - time = time, brokerTopicStats = new BrokerTopicStats) + time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + logDirFailureChannel = new LogDirFailureChannel(10)) /* append two messages */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index f4eabc0b518..c9f544178d1 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -20,7 +20,7 @@ package kafka.log import java.io.File import java.util.Properties -import kafka.server.BrokerTopicStats +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ @@ -236,13 +236,18 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { recoveryPoint = 0L, scheduler = time.scheduler, time = time, - brokerTopicStats = new BrokerTopicStats) + brokerTopicStats = new BrokerTopicStats, + maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + logDirFailureChannel = new LogDirFailureChannel(10)) log } private def makeLog(dir: File = logDir, config: LogConfig = logConfig) = Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - time = time, brokerTopicStats = new BrokerTopicStats) + time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + logDirFailureChannel = new LogDirFailureChannel(10)) private def records(key: Int, value: Int, timestamp: Long) = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes)) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 689a0320676..c29ece58fdc 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -23,7 +23,7 @@ import java.nio.file.Paths import java.util.Properties import kafka.common._ -import kafka.server.BrokerTopicStats +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record._ @@ -1238,7 +1238,9 @@ class LogCleanerTest extends JUnitSuite { private def makeLog(dir: File = dir, config: LogConfig = logConfig, recoveryPoint: Long = 0L) = Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler, - time = time, brokerTopicStats = new BrokerTopicStats) + time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + logDirFailureChannel = new LogDirFailureChannel(10)) private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 13ce19d77ab..ad64e39a0a5 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -23,12 +23,11 @@ import java.nio.file.Files import java.util.Properties import org.apache.kafka.common.errors._ -import kafka.api.ApiVersion import kafka.common.KafkaException import org.junit.Assert._ import org.junit.{After, Before, Test} import kafka.utils._ -import kafka.server.{BrokerTopicStats, KafkaConfig} +import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -36,7 +35,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.IsolationLevel -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.{Time, Utils} import org.easymock.EasyMock import scala.collection.JavaConverters._ @@ -46,9 +45,8 @@ class LogTest { val tmpDir = TestUtils.tempDir() val logDir = TestUtils.randomPartitionLogDir(tmpDir) - val time = new MockTime() + val mockTime = new MockTime() var config: KafkaConfig = null - val logConfig = LogConfig() val brokerTopicStats = new BrokerTopicStats @Before @@ -94,23 +92,13 @@ class LogTest { @Test def testTimeBasedLogRoll() { def createRecords = TestUtils.singletonRecords("test".getBytes) - - val logProps = new Properties() - logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long) - logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString) + val logConfig = createLogConfig(segmentMs = 1 * 60 * 60L) // create a log - val log = Log(logDir, - LogConfig(logProps), - logStartOffset = 0L, - recoveryPoint = 0L, - maxProducerIdExpirationMs = 24 * 60, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) // Test the segment rolling behavior when messages do not have a timestamp. - time.sleep(log.config.segmentMs + 1) + mockTime.sleep(log.config.segmentMs + 1) log.appendAsLeader(createRecords, leaderEpoch = 0) assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments) @@ -118,56 +106,48 @@ class LogTest { assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments) for (numSegments <- 3 until 5) { - time.sleep(log.config.segmentMs + 1) + mockTime.sleep(log.config.segmentMs + 1) log.appendAsLeader(createRecords, leaderEpoch = 0) assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments) } // Append a message with timestamp to a segment whose first message do not have a timestamp. - val timestamp = time.milliseconds + log.config.segmentMs + 1 + val timestamp = mockTime.milliseconds + log.config.segmentMs + 1 def createRecordsWithTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = timestamp) log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0) assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments) // Test the segment rolling behavior when messages have timestamps. - time.sleep(log.config.segmentMs + 1) + mockTime.sleep(log.config.segmentMs + 1) log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0) assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments) // move the wall clock beyond log rolling time - time.sleep(log.config.segmentMs + 1) + mockTime.sleep(log.config.segmentMs + 1) log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0) assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments) - val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) log.appendAsLeader(recordWithExpiredTimestamp, leaderEpoch = 0) assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments) val numSegments = log.numberOfSegments - time.sleep(log.config.segmentMs + 1) + mockTime.sleep(log.config.segmentMs + 1) log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE), leaderEpoch = 0) assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments) } @Test(expected = classOf[OutOfOrderSequenceException]) def testNonSequentialAppend(): Unit = { - val logProps = new Properties() - // create a log - val log = Log(logDir, - LogConfig(logProps), - recoveryPoint = 0L, - scheduler = time.scheduler, - time = time, - brokerTopicStats = new BrokerTopicStats) - + val log = createLog(logDir, LogConfig()) val pid = 1L val epoch: Short = 0 - val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 0) + val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 0) log.appendAsLeader(records, leaderEpoch = 0) - val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 2) + val nextRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 2) log.appendAsLeader(nextRecords, leaderEpoch = 0) } @@ -175,12 +155,12 @@ class LogTest { def testInitializationOfProducerSnapshotsUpgradePath(): Unit = { // simulate the upgrade path by creating a new log with several segments, deleting the // snapshot files, and then reloading the log - - val log = createLog(64, messagesPerSegment = 10) + val logConfig = createLogConfig(segmentBytes = 64 * 10) + val log = createLog(logDir, logConfig) assertEquals(None, log.oldestProducerSnapshotOffset) for (i <- 0 to 100) { - val record = new SimpleRecord(time.milliseconds, i.toString.getBytes) + val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) } @@ -191,7 +171,7 @@ class LogTest { Files.delete(file.toPath) } - val reloadedLog = createLog(64, messagesPerSegment = 10) + val reloadedLog = createLog(logDir, logConfig) val expectedSnapshotsOffsets = log.logSegments.toSeq.reverse.take(2).map(_.baseOffset) ++ Seq(reloadedLog.logEndOffset) expectedSnapshotsOffsets.foreach { offset => assertTrue(Log.producerSnapshotFile(logDir, offset).exists) @@ -213,8 +193,9 @@ class LogTest { @Test def testPidMapOffsetUpdatedForNonIdempotentData() { - val log = createLog(2048) - val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes))) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) + val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes))) log.appendAsLeader(records, leaderEpoch = 0) log.takeProducerSnapshot() assertEquals(Some(1), log.latestProducerSnapshotOffset) @@ -248,9 +229,9 @@ class LogTest { config, logStartOffset = 0L, recoveryPoint = 0L, - scheduler = time.scheduler, + scheduler = mockTime.scheduler, brokerTopicStats = brokerTopicStats, - time = time, + time = mockTime, maxProducerIdExpirationMs = 300000, producerIdExpirationCheckIntervalMs = 30000, topicPartition = Log.parseTopicPartitionName(logDir), @@ -317,9 +298,9 @@ class LogTest { config, logStartOffset = 0L, recoveryPoint = 0L, - scheduler = time.scheduler, + scheduler = mockTime.scheduler, brokerTopicStats = brokerTopicStats, - time = time, + time = mockTime, maxProducerIdExpirationMs = 300000, producerIdExpirationCheckIntervalMs = 30000, topicPartition = Log.parseTopicPartitionName(logDir), @@ -352,9 +333,9 @@ class LogTest { config, logStartOffset = 0L, recoveryPoint = 0L, - scheduler = time.scheduler, + scheduler = mockTime.scheduler, brokerTopicStats = brokerTopicStats, - time = time, + time = mockTime, maxProducerIdExpirationMs = 300000, producerIdExpirationCheckIntervalMs = 30000, topicPartition = Log.parseTopicPartitionName(logDir), @@ -388,9 +369,9 @@ class LogTest { config, logStartOffset = 0L, recoveryPoint = 0L, - scheduler = time.scheduler, + scheduler = mockTime.scheduler, brokerTopicStats = brokerTopicStats, - time = time, + time = mockTime, maxProducerIdExpirationMs = 300000, producerIdExpirationCheckIntervalMs = 30000, topicPartition = Log.parseTopicPartitionName(logDir), @@ -403,7 +384,8 @@ class LogTest { @Test def testRebuildPidMapWithCompactedData() { - val log = createLog(2048) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) val pid = 1L val epoch = 0.toShort val seq = 0 @@ -448,7 +430,8 @@ class LogTest { @Test def testRebuildProducerStateWithEmptyCompactedBatch() { - val log = createLog(2048) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) val pid = 1L val epoch = 0.toShort val seq = 0 @@ -491,7 +474,8 @@ class LogTest { @Test def testUpdatePidMapWithCompactedData() { - val log = createLog(2048) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) val pid = 1L val epoch = 0.toShort val seq = 0 @@ -526,7 +510,8 @@ class LogTest { @Test def testPidMapTruncateTo() { - val log = createLog(2048) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes))), leaderEpoch = 0) log.takeProducerSnapshot() @@ -546,8 +531,8 @@ class LogTest { @Test def testPidMapTruncateToWithNoSnapshots() { // This ensures that the upgrade optimization path cannot be hit after initial loading - - val log = createLog(2048) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) val pid = 1L val epoch = 0.toShort @@ -573,14 +558,15 @@ class LogTest { @Test def testLoadProducersAfterDeleteRecordsMidSegment(): Unit = { - val log = createLog(2048) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) val pid1 = 1L val pid2 = 2L val epoch = 0.toShort - log.appendAsLeader(TestUtils.records(List(new SimpleRecord(time.milliseconds(), "a".getBytes)), producerId = pid1, + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1, producerEpoch = epoch, sequence = 0), leaderEpoch = 0) - log.appendAsLeader(TestUtils.records(List(new SimpleRecord(time.milliseconds(), "b".getBytes)), producerId = pid2, + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2, producerEpoch = epoch, sequence = 0), leaderEpoch = 0) assertEquals(2, log.activeProducers.size) @@ -593,7 +579,7 @@ class LogTest { log.close() - val reloadedLog = createLog(2048, logStartOffset = 1L) + val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) assertEquals(1, reloadedLog.activeProducers.size) val reloadedEntryOpt = log.activeProducers.get(pid2) assertEquals(retainedEntryOpt, reloadedEntryOpt) @@ -601,15 +587,16 @@ class LogTest { @Test def testLoadProducersAfterDeleteRecordsOnSegment(): Unit = { - val log = createLog(2048) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) val pid1 = 1L val pid2 = 2L val epoch = 0.toShort - log.appendAsLeader(TestUtils.records(List(new SimpleRecord(time.milliseconds(), "a".getBytes)), producerId = pid1, + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1, producerEpoch = epoch, sequence = 0), leaderEpoch = 0) log.roll() - log.appendAsLeader(TestUtils.records(List(new SimpleRecord(time.milliseconds(), "b".getBytes)), producerId = pid2, + log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2, producerEpoch = epoch, sequence = 0), leaderEpoch = 0) assertEquals(2, log.logSegments.size) @@ -627,7 +614,7 @@ class LogTest { log.close() - val reloadedLog = createLog(2048, logStartOffset = 1L) + val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) assertEquals(1, reloadedLog.activeProducers.size) val reloadedEntryOpt = log.activeProducers.get(pid2) assertEquals(retainedEntryOpt, reloadedEntryOpt) @@ -636,7 +623,8 @@ class LogTest { @Test def testPidMapTruncateFullyAndStartAt() { val records = TestUtils.singletonRecords("foo".getBytes) - val log = createLog(records.sizeInBytes, messagesPerSegment = 1, retentionBytes = records.sizeInBytes * 2) + val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) + val log = createLog(logDir, logConfig) log.appendAsLeader(records, leaderEpoch = 0) log.takeProducerSnapshot() @@ -658,7 +646,8 @@ class LogTest { def testPidExpirationOnSegmentDeletion() { val pid1 = 1L val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0) - val log = createLog(records.sizeInBytes, messagesPerSegment = 1, retentionBytes = records.sizeInBytes * 2) + val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) + val log = createLog(logDir, logConfig) log.appendAsLeader(records, leaderEpoch = 0) log.takeProducerSnapshot() @@ -681,7 +670,8 @@ class LogTest { @Test def testTakeSnapshotOnRollAndDeleteSnapshotOnFlush() { - val log = createLog(2048) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) log.roll(1L) assertEquals(Some(1L), log.latestProducerSnapshotOffset) @@ -710,7 +700,8 @@ class LogTest { @Test def testRebuildTransactionalState(): Unit = { - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val pid = 137L val epoch = 5.toShort @@ -731,7 +722,7 @@ class LogTest { log.close() - val reopenedLog = createLog(1024 * 1024) + val reopenedLog = createLog(logDir, logConfig) reopenedLog.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1) assertEquals(None, reopenedLog.firstUnstableOffset) } @@ -743,57 +734,50 @@ class LogTest { coordinatorEpoch: Int = 0, partitionLeaderEpoch: Int = 0): MemoryRecords = { val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch) - MemoryRecords.withEndTransactionMarker(offset, time.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker) + MemoryRecords.withEndTransactionMarker(offset, mockTime.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker) } @Test def testPeriodicPidExpiration() { - val maxPidExpirationMs = 200 - val expirationCheckInterval = 100 + val maxProducerIdExpirationMs = 200 + val producerIdExpirationCheckIntervalMs = 100 val pid = 23L - val log = createLog(2048, maxPidExpirationMs = maxPidExpirationMs, - pidExpirationCheckIntervalMs = expirationCheckInterval) - val records = Seq(new SimpleRecord(time.milliseconds(), "foo".getBytes)) + val logConfig = createLogConfig(segmentBytes = 2048 * 5) + val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = maxProducerIdExpirationMs, + producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs) + val records = Seq(new SimpleRecord(mockTime.milliseconds(), "foo".getBytes)) log.appendAsLeader(TestUtils.records(records, producerId = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0) assertEquals(Set(pid), log.activeProducers.keySet) - time.sleep(expirationCheckInterval) + mockTime.sleep(producerIdExpirationCheckIntervalMs) assertEquals(Set(pid), log.activeProducers.keySet) - time.sleep(expirationCheckInterval) + mockTime.sleep(producerIdExpirationCheckIntervalMs) assertEquals(Set(), log.activeProducers.keySet) } @Test def testDuplicateAppends(): Unit = { - val logProps = new Properties() - // create a log - val log = Log(logDir, - LogConfig(logProps), - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) - + val log = createLog(logDir, LogConfig()) val pid = 1L val epoch: Short = 0 var seq = 0 // Pad the beginning of the log. for (_ <- 0 to 5) { - val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), + val record = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = seq) log.appendAsLeader(record, leaderEpoch = 0) seq = seq + 1 } // Append an entry with multiple log records. def createRecords = TestUtils.records(List( - new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), - new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), - new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes) + new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), + new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), + new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes) ), producerId = pid, producerEpoch = epoch, sequence = seq) val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0) assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3) @@ -811,8 +795,8 @@ class LogTest { try { val records = TestUtils.records( List( - new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), - new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)), + new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), + new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)), producerId = pid, producerEpoch = epoch, sequence = seq - 2) log.appendAsLeader(records, leaderEpoch = 0) fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " + @@ -824,7 +808,7 @@ class LogTest { // Append a Duplicate of an entry in the middle of the log. This is not allowed. try { val records = TestUtils.records( - List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)), + List(new SimpleRecord(mockTime.milliseconds, s"key-1".getBytes, s"value-1".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 1) log.appendAsLeader(records, leaderEpoch = 0) fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " + @@ -834,7 +818,7 @@ class LogTest { } // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry. - def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), + def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = seq) val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0) val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0) @@ -844,36 +828,29 @@ class LogTest { @Test def testMultiplePidsPerMemoryRecord() : Unit = { - val logProps = new Properties() - // create a log - val log = Log(logDir, - LogConfig(logProps), - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val log = createLog(logDir, LogConfig()) val epoch: Short = 0 val buffer = ByteBuffer.allocate(512) var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0, false, 0) + TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 1L, epoch, 0, false, 0) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0, false, 0) + TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 2L, epoch, 0, false, 0) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 3L, epoch, 0, false, 0) + TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 3L, epoch, 0, false, 0) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 4L, epoch, 0, false, 0) + TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 4L, epoch, 0, false, 0) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() @@ -897,7 +874,8 @@ class LogTest { @Test(expected = classOf[DuplicateSequenceNumberException]) def testDuplicateAppendToFollower() : Unit = { - val log = createLog(1024*1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val epoch: Short = 0 val pid = 1L val baseSequence = 0 @@ -912,7 +890,8 @@ class LogTest { @Test(expected = classOf[DuplicateSequenceNumberException]) def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = { - val log = createLog(1024*1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val pid1 = 1L val pid2 = 2L @@ -922,31 +901,31 @@ class LogTest { // pid1 seq = 0 var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), pid1, epoch, 0) + TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), pid1, epoch, 0) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() // pid2 seq = 0 builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), pid2, epoch, 0) + TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), pid2, epoch, 0) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() // pid1 seq = 1 builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), pid1, epoch, 1) + TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), pid1, epoch, 1) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() // pid2 seq = 1 builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), pid2, epoch, 1) + TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), pid2, epoch, 1) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() // // pid1 seq = 1 (duplicate) builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, - TimestampType.LOG_APPEND_TIME, 4L, time.milliseconds(), pid1, epoch, 1) + TimestampType.LOG_APPEND_TIME, 4L, mockTime.milliseconds(), pid1, epoch, 1) builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) builder.close() @@ -961,24 +940,16 @@ class LogTest { @Test(expected = classOf[ProducerFencedException]) def testOldProducerEpoch(): Unit = { - val logProps = new Properties() - // create a log - val log = Log(logDir, - LogConfig(logProps), - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) - + val log = createLog(logDir, LogConfig()) val pid = 1L val newEpoch: Short = 1 val oldEpoch: Short = 0 - val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = newEpoch, sequence = 0) + val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = newEpoch, sequence = 0) log.appendAsLeader(records, leaderEpoch = 0) - val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = oldEpoch, sequence = 0) + val nextRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = oldEpoch, sequence = 0) log.appendAsLeader(nextRecords, leaderEpoch = 0) } @@ -988,29 +959,20 @@ class LogTest { */ @Test def testTimeBasedLogRollJitter() { - var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val maxJitter = 20 * 60L - - val logProps = new Properties() - logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long) - logProps.put(LogConfig.SegmentJitterMsProp, maxJitter: java.lang.Long) // create a log - val log = Log(logDir, - LogConfig(logProps), - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val logConfig = createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter) + val log = createLog(logDir, logConfig) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) log.appendAsLeader(set, leaderEpoch = 0) - time.sleep(log.config.segmentMs - maxJitter) - set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + mockTime.sleep(log.config.segmentMs - maxJitter) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) log.appendAsLeader(set, leaderEpoch = 0) assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments) - time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) - set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) + set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) log.appendAsLeader(set, leaderEpoch = 0) assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments) } @@ -1020,18 +982,13 @@ class LogTest { */ @Test def testSizeBasedLogRoll() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val setSize = createRecords.sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages - - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) - // We use need to use magic value 1 here because the test is message size sensitive. - logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) // create a log - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = segmentSize) + val log = createLog(logDir, logConfig) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -1046,9 +1003,8 @@ class LogTest { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) - log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds), leaderEpoch = 0) + val log = createLog(logDir, LogConfig()) + log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0) } /** @@ -1056,12 +1012,8 @@ class LogTest { */ @Test def testAppendAndReadWithSequentialOffsets() { - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) - // We use need to use magic value 1 here because the test is message size sensitive. - logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 71) + val log = createLog(logDir, logConfig) val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray for(value <- values) @@ -1084,10 +1036,8 @@ class LogTest { */ @Test def testAppendAndReadWithNonSequentialOffsets() { - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 72) + val log = createLog(logDir, logConfig) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -1110,10 +1060,8 @@ class LogTest { */ @Test def testReadAtLogGap() { - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 300) + val log = createLog(logDir, logConfig) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -1128,10 +1076,8 @@ class LogTest { @Test def testReadWithMinMessage() { - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 72) + val log = createLog(logDir, logConfig) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -1157,10 +1103,8 @@ class LogTest { @Test def testReadWithTooSmallMaxLength() { - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 72) + val log = createLog(logDir, logConfig) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) @@ -1192,12 +1136,9 @@ class LogTest { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val logProps = new Properties() - // set up replica log starting with offset 1024 and with one message (at offset 1024) - logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 1024) + val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0) assertEquals("Reading at the log end offset should produce 0 byte read.", 0, @@ -1228,13 +1169,11 @@ class LogTest { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 100) + val log = createLog(logDir, logConfig) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes, - timestamp = time.milliseconds)) + timestamp = mockTime.milliseconds)) messageSets.foreach(log.appendAsLeader(_, leaderEpoch = 0)) log.flush() @@ -1268,10 +1207,8 @@ class LogTest { @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 110) + val log = createLog(logDir, logConfig) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0) @@ -1294,13 +1231,10 @@ class LogTest { for(messagesToAppend <- List(0, 1, 25)) { logDir.mkdirs() // first test a log segment starting at 0 - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) - logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 100, retentionMs = 0) + val log = createLog(logDir, logConfig) for(i <- 0 until messagesToAppend) - log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10), leaderEpoch = 0) + log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0) val currOffset = log.logEndOffset assertEquals(currOffset, messagesToAppend) @@ -1315,7 +1249,7 @@ class LogTest { assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", currOffset, - log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = time.milliseconds), leaderEpoch = 0).firstOffset) + log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0).firstOffset) // cleanup the log log.delete() @@ -1331,12 +1265,8 @@ class LogTest { val messageSet = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes)) // append messages to log val configSegmentSize = messageSet.sizeInBytes - 1 - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer) - // We use need to use magic value 1 here because the test is message size sensitive. - logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = configSegmentSize) + val log = createLog(logDir, logConfig) try { log.appendAsLeader(messageSet, leaderEpoch = 0) @@ -1360,11 +1290,8 @@ class LogTest { val messageSetWithKeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage) val messageSetWithKeyedMessages = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, anotherKeyedMessage) - val logProps = new Properties() - logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(cleanupPolicy = LogConfig.Compact) + val log = createLog(logDir, logConfig) try { log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) @@ -1404,10 +1331,8 @@ class LogTest { // append messages to log val maxMessageSize = second.sizeInBytes - 1 - val logProps = new Properties() - logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer) - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(maxMessageBytes = maxMessageSize) + val log = createLog(logDir, logConfig) // should be able to append the small message log.appendAsLeader(first, leaderEpoch = 0) @@ -1428,16 +1353,11 @@ class LogTest { val messageSize = 100 val segmentSize = 7 * messageSize val indexInterval = 3 * messageSize - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer) - logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer) - val config = LogConfig(logProps) - var log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = indexInterval, segmentIndexBytes = 4096) + var log = createLog(logDir, logConfig) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize), - timestamp = time.milliseconds + i * 10), leaderEpoch = 0) + timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) val lastIndexOffset = log.activeSegment.index.lastOffset val numIndexEntries = log.activeSegment.index.entries @@ -1460,14 +1380,12 @@ class LogTest { assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries) } - log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + log = createLog(logDir, logConfig, recoveryPoint = lastOffset) verifyRecoveredLog(log) log.close() // test recovery case - log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + log = createLog(logDir, logConfig) verifyRecoveredLog(log) log.close() } @@ -1478,22 +1396,17 @@ class LogTest { @Test def testBuildTimeIndexWhenNotAssigningOffsets() { val numMessages = 100 - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 10000: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - - val config = LogConfig(logProps) - val log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 10000, indexIntervalBytes = 1) + val log = createLog(logDir, logConfig) val messages = (0 until numMessages).map { i => - MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes())) + MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(mockTime.milliseconds + i, i.toString.getBytes())) } messages.foreach(log.appendAsFollower) val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries } assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries) - assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}", - time.milliseconds + numMessages - 1, log.activeSegment.timeIndex.lastEntry.timestamp) + assertEquals(s"The last time index entry should have timestamp ${mockTime.milliseconds + numMessages - 1}", + mockTime.milliseconds + numMessages - 1, log.activeSegment.timeIndex.lastEntry.timestamp) } /** @@ -1503,15 +1416,10 @@ class LogTest { def testIndexRebuild() { // publish the messages and close the log val numMessages = 200 - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - - val config = LogConfig(logProps) - var log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) + var log = createLog(logDir, logConfig) for(i <- 0 until numMessages) - log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0) + log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) val indexFiles = log.logSegments.map(_.index.file) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() @@ -1521,17 +1429,16 @@ class LogTest { timeIndexFiles.foreach(_.delete()) // reopen the log - log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + log = createLog(logDir, logConfig) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0) assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) for(i <- 0 until numMessages) { assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset) if (i == 0) - assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) + assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset) else - assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) + assertEquals(i, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset) } log.close() } @@ -1543,17 +1450,11 @@ class LogTest { def testRebuildTimeIndexForOldMessages() { val numMessages = 200 val segmentSize = 200 - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0") - - val config = LogConfig(logProps) - var log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = 1, messageFormatVersion = "0.9.0") + var log = createLog(logDir, logConfig) for(i <- 0 until numMessages) log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), - timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0) + timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() @@ -1561,8 +1462,7 @@ class LogTest { timeIndexFiles.foreach(_.delete()) // The rebuilt time index should be empty - log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1) val segArray = log.logSegments.toArray for (i <- segArray.indices.init) { assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries) @@ -1577,15 +1477,10 @@ class LogTest { def testCorruptIndexRebuild() { // publish the messages and close the log val numMessages = 200 - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - - val config = LogConfig(logProps) - var log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) + var log = createLog(logDir, logConfig) for(i <- 0 until numMessages) - log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0) + log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) val indexFiles = log.logSegments.map(_.index.file) val timeIndexFiles = log.logSegments.map(_.timeIndex.file) log.close() @@ -1605,15 +1500,14 @@ class LogTest { } // reopen the log - log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + log = createLog(logDir, logConfig, recoveryPoint = 200L) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) { assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset) if (i == 0) - assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) + assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset) else - assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) + assertEquals(i, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset) } log.close() } @@ -1623,17 +1517,14 @@ class LogTest { */ @Test def testTruncateTo() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val setSize = createRecords.sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * setSize // each segment will be 10 messages - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) - // create a log - val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = segmentSize) + val log = createLog(logDir, logConfig) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (_ <- 1 to msgPerSeg) @@ -1681,24 +1572,20 @@ class LogTest { */ @Test def testIndexResizingAtTruncation() { - val setSize = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds).sizeInBytes + val setSize = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds).sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * setSize // each segment will be 10 messages - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer) - val config = LogConfig(logProps) - val log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val logConfig = createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = setSize - 1) + val log = createLog(logDir, logConfig) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) - log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0) + log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) - time.sleep(msgPerSeg) + mockTime.sleep(msgPerSeg) for (i<- 1 to msgPerSeg) - log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0) + log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments) val expectedEntries = msgPerSeg - 1 @@ -1710,9 +1597,9 @@ class LogTest { assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.index.maxEntries) assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/12, log.logSegments.toList.head.timeIndex.maxEntries) - time.sleep(msgPerSeg) + mockTime.sleep(msgPerSeg) for (i<- 1 to msgPerSeg) - log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0) + log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) } @@ -1726,18 +1613,9 @@ class LogTest { val bogusIndex2 = Log.offsetIndexFile(logDir, 5) val bogusTimeIndex2 = Log.timeIndexFile(logDir, 5) - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer) - logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - val log = Log(logDir, - LogConfig(logProps), - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1) + val log = createLog(logDir, logConfig) assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0) @@ -1756,33 +1634,16 @@ class LogTest { */ @Test def testReopenThenTruncate() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer) - logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer) - val config = LogConfig(logProps) - + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) // create a log - var log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000) + var log = createLog(logDir, logConfig) // add enough messages to roll over several segments then close and re-open and attempt to truncate for (_ <- 0 until 100) log.appendAsLeader(createRecords, leaderEpoch = 0) log.close() - log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + log = createLog(logDir, logConfig) log.truncateTo(3) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) @@ -1793,23 +1654,11 @@ class LogTest { */ @Test def testAsyncDelete() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000L) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000L) val asyncDeleteMs = 1000 - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer) - logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer) - logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer) - logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer) - val config = LogConfig(logProps) - - val log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000, + retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 100) @@ -1831,7 +1680,7 @@ class LogTest { // when enough time passes the files should be deleted val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file) - time.sleep(asyncDeleteMs + 1) + mockTime.sleep(asyncDeleteMs + 1) assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists)) } @@ -1840,19 +1689,9 @@ class LogTest { */ @Test def testOpenDeletesObsoleteFiles() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer) - logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) - logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer) - val config = LogConfig(logProps) - var log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) + var log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 100) @@ -1862,26 +1701,13 @@ class LogTest { log.onHighWatermarkIncremented(log.logEndOffset) log.deleteOldSegments() log.close() - - log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + log = createLog(logDir, logConfig) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } @Test def testAppendMessageWithNullPayload() { - val log = Log(logDir, - LogConfig(), - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val log = createLog(logDir, LogConfig()) log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0) val head = log.readUncommitted(0, 4096, None).records.records.iterator.next() assertEquals(0, head.offset) @@ -1890,13 +1716,7 @@ class LogTest { @Test(expected = classOf[IllegalArgumentException]) def testAppendWithOutOfOrderOffsetsThrowsException() { - val log = Log(logDir, - LogConfig(), - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val log = createLog(logDir, LogConfig()) val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0)) val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes)) @@ -1905,13 +1725,7 @@ class LogTest { @Test def testAppendWithNoTimestamp(): Unit = { - val log = Log(logDir, - LogConfig(), - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val log = createLog(logDir, LogConfig()) log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0) } @@ -1919,23 +1733,13 @@ class LogTest { @Test def testCorruptLog() { // append some messages to create some segments - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer) - val config = LogConfig(logProps) - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val recoveryPoint = 50L for (_ <- 0 until 10) { // create a log and write some messages to it logDir.mkdirs() - var log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + var log = createLog(logDir, logConfig) val numMessages = 50 + TestUtils.random.nextInt(50) for (_ <- 0 until numMessages) log.appendAsLeader(createRecords, leaderEpoch = 0) @@ -1947,7 +1751,7 @@ class LogTest { TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) // attempt recovery - log = Log(logDir, config, 0L, recoveryPoint, time.scheduler, brokerTopicStats, time) + log = createLog(logDir, logConfig, 0L, recoveryPoint) assertEquals(numMessages, log.logEndOffset) val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList @@ -1968,18 +1772,8 @@ class LogTest { @Test def testOverCompactedLogRecovery(): Unit = { // append some messages to create some segments - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer) - logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - val config = LogConfig(logProps) - val log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + val log = createLog(logDir, logConfig) val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes())) val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes())) val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, 0, new SimpleRecord("v4".getBytes(), "k4".getBytes())) @@ -2012,24 +1806,14 @@ class LogTest { @Test def testCleanShutdownFile() { // append some messages to create some segments - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer) - logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer) - logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) - val config = LogConfig(logProps) - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) + val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) val cleanShutdownFile = createCleanShutdownFile() assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) var recoveryPoint = 0L // create a log and write some messages to it - var log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + var log = createLog(logDir, logConfig) for (_ <- 0 until 100) log.appendAsLeader(createRecords, leaderEpoch = 0) log.close() @@ -2037,7 +1821,7 @@ class LogTest { // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the // clean shutdown file exists. recoveryPoint = log.logEndOffset - log = Log(logDir, config, 0L, 0L, time.scheduler, brokerTopicStats, time) + log = createLog(logDir, logConfig) assertEquals(recoveryPoint, log.logEndOffset) cleanShutdownFile.delete() } @@ -2196,19 +1980,9 @@ class LogTest { @Test def testDeleteOldSegments() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000) - val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer) - logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) - logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer) - val config = LogConfig(logProps) - val log = Log(logDir, - config, - logStartOffset = 0L, - recoveryPoint = 0L, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 100) @@ -2257,7 +2031,8 @@ class LogTest { @Test def testLogDeletionAfterDeleteRecords() { def createRecords = TestUtils.singletonRecords("test".getBytes) - val log = createLog(createRecords.sizeInBytes) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5) + val log = createLog(logDir, logConfig) for (_ <- 0 until 15) log.appendAsLeader(createRecords, leaderEpoch = 0) @@ -2288,7 +2063,8 @@ class LogTest { @Test def shouldDeleteSizeBasedSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) - val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2302,7 +2078,8 @@ class LogTest { @Test def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() { def createRecords = TestUtils.singletonRecords("test".getBytes) - val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 15) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2316,7 +2093,8 @@ class LogTest { @Test def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() { def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10) - val log = createLog(createRecords.sizeInBytes, retentionMs = 10000) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2329,8 +2107,9 @@ class LogTest { @Test def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() { - def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = time.milliseconds) - val log = createLog(createRecords.sizeInBytes, retentionMs = 10000000) + def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000) + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2344,16 +2123,15 @@ class LogTest { @Test def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() { def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) - val log = createLog(createRecords.sizeInBytes, - retentionMs = 10000, - cleanupPolicy = "compact") + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact") + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) log.appendAsLeader(createRecords, leaderEpoch = 0) // mark oldest segment as older the retention.ms - log.logSegments.head.lastModified = time.milliseconds - 20000 + log.logSegments.head.lastModified = mockTime.milliseconds - 20000 val segments = log.numberOfSegments log.onHighWatermarkIncremented(log.logEndOffset) @@ -2364,9 +2142,8 @@ class LogTest { @Test def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() { def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) - val log = createLog(createRecords.sizeInBytes, - retentionMs = 10000, - cleanupPolicy = "compact,delete") + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete") + val log = createLog(logDir, logConfig) // append some messages to create some segments for (_ <- 0 until 15) @@ -2383,8 +2160,7 @@ class LogTest { //Given this partition is on leader epoch 72 val epoch = 72 - val log = Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val log = createLog(logDir, LogConfig()) log.leaderEpochCache.assign(epoch, records.size) //When appending messages as a leader (i.e. assignOffsets = true) @@ -2416,8 +2192,7 @@ class LogTest { recs } - val log = Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val log = createLog(logDir, LogConfig()) //When appending as follower (assignOffsets = false) for (i <- records.indices) @@ -2429,7 +2204,8 @@ class LogTest { @Test def shouldTruncateLeaderEpochsWhenDeletingSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) - val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) + val log = createLog(logDir, logConfig) val cache = epochCache(log) // Given three segments of 5 messages each @@ -2453,7 +2229,8 @@ class LogTest { @Test def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() { def createRecords = TestUtils.singletonRecords("test".getBytes) - val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10) + val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) + val log = createLog(logDir, logConfig) val cache = epochCache(log) // Given three segments of 5 messages each @@ -2476,10 +2253,9 @@ class LogTest { @Test def shouldTruncateLeaderEpochFileWhenTruncatingLog() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) - val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * createRecords.sizeInBytes).toString) - val log = Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) + val logConfig = createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes) + val log = createLog(logDir, logConfig) val cache = epochCache(log) //Given 2 segments, 10 messages per segment @@ -2524,8 +2300,7 @@ class LogTest { */ @Test def testLogRecoversForLeaderEpoch() { - val log = Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val log = createLog(logDir, LogConfig()) val leaderEpochCache = epochCache(log) val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0) log.appendAsFollower(records = firstBatch) @@ -2547,8 +2322,7 @@ class LogTest { log.close() // reopen the log and recover from the beginning - val recoveredLog = Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, time = time) + val recoveredLog = createLog(logDir, LogConfig()) val recoveredLeaderEpochCache = epochCache(recoveredLog) // epoch entries should be recovered @@ -2576,7 +2350,8 @@ class LogTest { } def testFirstUnstableOffsetNoTransactionalData() { - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("foo".getBytes), @@ -2589,7 +2364,8 @@ class LogTest { @Test def testFirstUnstableOffsetWithTransactionalData() { - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val pid = 137L val epoch = 5.toShort @@ -2626,7 +2402,8 @@ class LogTest { @Test def testTransactionIndexUpdated(): Unit = { - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid1 = 1L @@ -2666,7 +2443,8 @@ class LogTest { @Test def testFullTransactionIndexRecovery(): Unit = { - val log = createLog(128) + val logConfig = createLogConfig(segmentBytes = 128 * 5) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid1 = 1L @@ -2708,14 +2486,16 @@ class LogTest { log.close() - val reloadedLog = createLog(1024) + val reloadedLogConfig = createLogConfig(segmentBytes = 1024 * 5) + val reloadedLog = createLog(logDir, reloadedLogConfig) val abortedTransactions = allAbortedTransactions(reloadedLog) assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) } @Test def testRecoverOnlyLastSegment(): Unit = { - val log = createLog(128) + val logConfig = createLogConfig(segmentBytes = 128 * 5) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid1 = 1L @@ -2757,14 +2537,16 @@ class LogTest { log.close() - val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint) + val reloadedLogConfig = createLogConfig(segmentBytes = 1024 * 5) + val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint) val abortedTransactions = allAbortedTransactions(reloadedLog) assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) } @Test def testRecoverLastSegmentWithNoSnapshots(): Unit = { - val log = createLog(128) + val logConfig = createLogConfig(segmentBytes = 128 * 5) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid1 = 1L @@ -2812,7 +2594,8 @@ class LogTest { log.close() - val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint) + val reloadedLogConfig = createLogConfig(segmentBytes = 1024 * 5) + val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint) val abortedTransactions = allAbortedTransactions(reloadedLog) assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) } @@ -2820,7 +2603,8 @@ class LogTest { @Test def testTransactionIndexUpdatedThroughReplication(): Unit = { val epoch = 0.toShort - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val buffer = ByteBuffer.allocate(2048) val pid1 = 1L @@ -2865,7 +2649,8 @@ class LogTest { def testZombieCoordinatorFenced(): Unit = { val pid = 1L val epoch = 0.toShort - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val append = appendTransactionalAsLeader(log, pid, epoch) @@ -2880,7 +2665,8 @@ class LogTest { @Test def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = { - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid = 1L val appendPid = appendTransactionalAsLeader(log, pid, epoch) @@ -2903,7 +2689,8 @@ class LogTest { @Test def testFirstUnstableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = { - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) val epoch = 0.toShort val pid = 1L val appendPid = appendTransactionalAsLeader(log, pid, epoch) @@ -2929,7 +2716,8 @@ class LogTest { @Test def testLastStableOffsetWithMixedProducerData() { - val log = createLog(1024 * 1024) + val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) // for convenience, both producers share the same epoch val epoch = 5.toShort @@ -2989,7 +2777,8 @@ class LogTest { new SimpleRecord("b".getBytes), new SimpleRecord("c".getBytes)) - val log = createLog(messageSizeInBytes = records.sizeInBytes, messagesPerSegment = 1) + val logConfig = createLogConfig(segmentBytes = records.sizeInBytes) + val log = createLog(logDir, logConfig) val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0) assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset)) @@ -3019,26 +2808,52 @@ class LogTest { assertEquals(new AbortedTransaction(pid, 0), fetchDataInfo.abortedTransactions.get.head) } - private def createLog(messageSizeInBytes: Int, retentionMs: Int = -1, retentionBytes: Int = -1, - cleanupPolicy: String = "delete", messagesPerSegment: Int = 5, - maxPidExpirationMs: Int = 300000, pidExpirationCheckIntervalMs: Int = 30000, - recoveryPoint: Long = 0L, logStartOffset: Long = 0L): Log = { + def createLogConfig(segmentMs: Long = Defaults.SegmentMs, + segmentBytes: Int = Defaults.SegmentSize, + retentionMs: Long = Defaults.RetentionMs, + retentionBytes: Long = Defaults.RetentionSize, + segmentJitterMs: Long = Defaults.SegmentJitterMs, + cleanupPolicy: String = Defaults.CleanupPolicy, + maxMessageBytes: Int = Defaults.MaxMessageSize, + indexIntervalBytes: Int = Defaults.IndexInterval, + segmentIndexBytes: Int = Defaults.MaxIndexSize, + messageFormatVersion: String = Defaults.MessageFormatVersion, + fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): LogConfig = { val logProps = new Properties() - logProps.put(LogConfig.SegmentBytesProp, messageSizeInBytes * messagesPerSegment: Integer) - logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer) - logProps.put(LogConfig.RetentionBytesProp, retentionBytes: Integer) + + logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long) + logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer) + logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long) + logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long) + logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs: java.lang.Long) logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy) - logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString) - val config = LogConfig(logProps) - Log(logDir, - config, - logStartOffset = logStartOffset, - recoveryPoint = recoveryPoint, - scheduler = time.scheduler, - brokerTopicStats = brokerTopicStats, - time = time, - maxProducerIdExpirationMs = maxPidExpirationMs, - producerIdExpirationCheckIntervalMs = pidExpirationCheckIntervalMs) + logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer) + logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion) + logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: java.lang.Long) + LogConfig(logProps) + } + + def createLog(dir: File, + config: LogConfig, + logStartOffset: Long = 0L, + recoveryPoint: Long = 0L, + scheduler: Scheduler = mockTime.scheduler, + brokerTopicStats: BrokerTopicStats = brokerTopicStats, + time: Time = mockTime, + maxProducerIdExpirationMs: Int = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs): Log = { + Log(dir = dir, + config = config, + logStartOffset = logStartOffset, + recoveryPoint = recoveryPoint, + scheduler = scheduler, + brokerTopicStats = brokerTopicStats, + time = time, + maxProducerIdExpirationMs = maxProducerIdExpirationMs, + producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs, + logDirFailureChannel = new LogDirFailureChannel(10)) } private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) @@ -3087,7 +2902,7 @@ class LogTest { private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, offset: Long, controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = { val marker = new EndTransactionMarker(controlType, coordinatorEpoch) - MemoryRecords.writeEndTransactionalMarker(buffer, offset, time.milliseconds(), 0, producerId, producerEpoch, marker) + MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), 0, producerId, producerEpoch, marker) } private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = { @@ -3111,5 +2926,4 @@ class LogTest { assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) cleanShutdownFile } - }