Browse Source

KAFKA-5663; Pass non-null logDirFailureChannel to Log.apply

Also:
- Improve logging
- Remove dangerous default arguments in Log.apply
- Improve naming of methods and fields in LogDirFailureChannel
- Some clean-ups

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>, Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3594 from lindong28/KAFKA-5663
pull/3491/merge
Dong Lin 7 years ago committed by Ismael Juma
parent
commit
fb21209b5a
  1. 12
      core/src/main/scala/kafka/log/Log.scala
  2. 4
      core/src/main/scala/kafka/log/LogCleaner.scala
  3. 3
      core/src/main/scala/kafka/log/LogConfig.scala
  4. 43
      core/src/main/scala/kafka/log/LogManager.scala
  5. 28
      core/src/main/scala/kafka/server/LogDirFailureChannel.scala
  6. 2
      core/src/main/scala/kafka/server/ReplicaManager.scala
  7. 10
      core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
  8. 34
      core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala
  9. 7
      core/src/test/scala/other/kafka/StressTestLog.scala
  10. 5
      core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
  11. 9
      core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
  12. 5
      core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
  13. 6
      core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
  14. 11
      core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
  15. 6
      core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
  16. 794
      core/src/test/scala/unit/kafka/log/LogTest.scala

12
core/src/main/scala/kafka/log/Log.scala

@ -1600,7 +1600,7 @@ class Log(@volatile var dir: File, @@ -1600,7 +1600,7 @@ class Log(@volatile var dir: File,
fun
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddLogFailureEvent(dir.getParent)
logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e)
throw new KafkaStorageException(msg, e)
}
}
@ -1649,14 +1649,14 @@ object Log { @@ -1649,14 +1649,14 @@ object Log {
def apply(dir: File,
config: LogConfig,
logStartOffset: Long = 0L,
recoveryPoint: Long = 0L,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
logDirFailureChannel: LogDirFailureChannel = null): Log = {
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log = {
val topicPartition = Log.parseTopicPartitionName(dir)
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs,

4
core/src/main/scala/kafka/log/LogCleaner.scala

@ -265,8 +265,8 @@ class LogCleaner(val config: CleanerConfig, @@ -265,8 +265,8 @@ class LogCleaner(val config: CleanerConfig,
} catch {
case _: LogCleaningAbortedException => // task can be aborted, let it go.
case e: IOException =>
error(s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException", e)
logDirFailureChannel.maybeAddLogFailureEvent(cleanable.log.dir.getParent)
val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException"
logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
} finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}

3
core/src/main/scala/kafka/log/LogConfig.scala

@ -47,6 +47,7 @@ object Defaults { @@ -47,6 +47,7 @@ object Defaults {
val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs
val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
val Compact = kafka.server.Defaults.LogCleanupPolicy
val CleanupPolicy = kafka.server.Defaults.LogCleanupPolicy
val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
val CompressionType = kafka.server.Defaults.CompressionType
@ -235,7 +236,7 @@ object LogConfig { @@ -235,7 +236,7 @@ object LogConfig {
KafkaConfig.LogDeleteDelayMsProp)
.define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
MinCleanableRatioDoc, KafkaConfig.LogCleanerMinCleanRatioProp)
.define(CleanupPolicyProp, LIST, Defaults.Compact, ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc,
.define(CleanupPolicyProp, LIST, Defaults.CleanupPolicy, ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc,
KafkaConfig.LogCleanupPolicyProp)
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
MEDIUM, UncleanLeaderElectionEnableDoc, KafkaConfig.UncleanLeaderElectionEnableProp)

43
core/src/main/scala/kafka/log/LogManager.scala

@ -194,8 +194,7 @@ class LogManager(logDirs: Array[File], @@ -194,8 +194,7 @@ class LogManager(logDirs: Array[File],
Some(lock)
} catch {
case e: IOException =>
error(s"Disk error while locking directory $dir", e)
logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e)
None
}
}
@ -214,9 +213,11 @@ class LogManager(logDirs: Array[File], @@ -214,9 +213,11 @@ class LogManager(logDirs: Array[File],
logStartOffset = logStartOffset,
recoveryPoint = logRecoveryPoint,
maxProducerIdExpirationMs = maxPidExpirationMs,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
scheduler = scheduler,
time = time,
brokerTopicStats = brokerTopicStats)
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
this.logsToBeDeleted.add(current)
@ -237,7 +238,7 @@ class LogManager(logDirs: Array[File], @@ -237,7 +238,7 @@ class LogManager(logDirs: Array[File],
info("Loading logs.")
val startMs = time.milliseconds
val threadPools = ArrayBuffer.empty[ExecutorService]
val offlineDirs = ArrayBuffer.empty[String]
val offlineDirs = ArrayBuffer.empty[(String, IOException)]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
for (dir <- liveLogDirs) {
@ -283,7 +284,7 @@ class LogManager(logDirs: Array[File], @@ -283,7 +284,7 @@ class LogManager(logDirs: Array[File],
loadLogs(logDir, recoveryPoints, logStartOffsets)
} catch {
case e: IOException =>
offlineDirs.append(dir.getAbsolutePath)
offlineDirs.append((dir.getAbsolutePath, e))
error("Error while loading log dir " + dir.getAbsolutePath, e)
}
}
@ -291,7 +292,7 @@ class LogManager(logDirs: Array[File], @@ -291,7 +292,7 @@ class LogManager(logDirs: Array[File],
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
} catch {
case e: IOException =>
offlineDirs.append(dir.getAbsolutePath)
offlineDirs.append((dir.getAbsolutePath, e))
error("Error while loading log dir " + dir.getAbsolutePath, e)
}
}
@ -303,11 +304,13 @@ class LogManager(logDirs: Array[File], @@ -303,11 +304,13 @@ class LogManager(logDirs: Array[File],
cleanShutdownFile.delete()
} catch {
case e: IOException =>
offlineDirs.append(cleanShutdownFile.getParent)
offlineDirs.append((cleanShutdownFile.getParent, e))
error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)
}
}
offlineDirs.foreach(logDirFailureChannel.maybeAddLogFailureEvent)
offlineDirs.foreach { case (dir, e) =>
logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
}
} catch {
case e: ExecutionException => {
error("There was an error in one of the threads during logs loading: " + e.getCause)
@ -500,8 +503,7 @@ class LogManager(logDirs: Array[File], @@ -500,8 +503,7 @@ class LogManager(logDirs: Array[File],
this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint)))
} catch {
case e: IOException =>
error(s"Disk error while writing to recovery point file in directory $dir", e)
logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point file in directory $dir", e)
}
}
}
@ -518,8 +520,7 @@ class LogManager(logDirs: Array[File], @@ -518,8 +520,7 @@ class LogManager(logDirs: Array[File],
))
} catch {
case e: IOException =>
error(s"Disk error while writing to logStartOffset file in directory $dir", e)
logDirFailureChannel.maybeAddLogFailureEvent(dir.getAbsolutePath)
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e)
}
}
}
@ -555,9 +556,12 @@ class LogManager(logDirs: Array[File], @@ -555,9 +556,12 @@ class LogManager(logDirs: Array[File],
logStartOffset = 0L,
recoveryPoint = 0L,
maxProducerIdExpirationMs = maxPidExpirationMs,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
scheduler = scheduler,
time = time,
brokerTopicStats = brokerTopicStats)
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel)
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
@ -568,8 +572,9 @@ class LogManager(logDirs: Array[File], @@ -568,8 +572,9 @@ class LogManager(logDirs: Array[File],
log
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddLogFailureEvent(dataDir.getAbsolutePath)
throw new KafkaStorageException(s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}", e)
val msg = s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(dataDir.getAbsolutePath, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}
@ -635,8 +640,9 @@ class LogManager(logDirs: Array[File], @@ -635,8 +640,9 @@ class LogManager(logDirs: Array[File],
}
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddLogFailureEvent(removedLog.dir.getParent)
throw new KafkaStorageException(s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}.", e)
val msg = s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}."
logDirFailureChannel.maybeAddOfflineLogDir(removedLog.dir.getParent, msg, e)
throw new KafkaStorageException(msg, e)
}
} else if (offlineLogDirs.nonEmpty) {
throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(","))
@ -727,6 +733,9 @@ class LogManager(logDirs: Array[File], @@ -727,6 +733,9 @@ class LogManager(logDirs: Array[File],
}
object LogManager {
val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
zkUtils: ZkUtils,

28
core/src/main/scala/kafka/server/LogDirFailureChannel.scala

@ -18,29 +18,35 @@ @@ -18,29 +18,35 @@
package kafka.server
import java.io.IOException
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import kafka.utils.Logging
/*
* LogDirFailureChannel allows an external thread to block waiting for new offline log dir.
* LogDirFailureChannel allows an external thread to block waiting for new offline log dirs.
*
* LogDirFailureChannel should be a singleton object which can be accessed by any class that does disk-IO operation.
* If IOException is encountered while accessing a log directory, the corresponding class can insert the the log directory name
* to the LogDirFailureChannel using maybeAddLogFailureEvent(). Then a thread which is blocked waiting for new offline log directories
* can take the name of the new offline log directory out of the LogDirFailureChannel and handles the log failure properly.
* There should be a single instance of LogDirFailureChannel accessible by any class that does disk-IO operation.
* If IOException is encountered while accessing a log directory, the corresponding class can add the log directory name
* to the LogDirFailureChannel using maybeAddOfflineLogDir(). Each log directory will be added only once. After a log
* directory is added for the first time, a thread which is blocked waiting for new offline log directories
* can take the name of the new offline log directory out of the LogDirFailureChannel and handle the log failure properly.
* An offline log directory will stay offline until the broker is restarted.
*
*/
class LogDirFailureChannel(logDirNum: Int) {
class LogDirFailureChannel(logDirNum: Int) extends Logging {
private val offlineLogDirs = new ConcurrentHashMap[String, String]
private val logDirFailureEvent = new ArrayBlockingQueue[String](logDirNum)
private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum)
/*
* If the given logDir is not already offline, add it to the
* set of offline log dirs and enqueue it to the logDirFailureEvent queue
*/
def maybeAddLogFailureEvent(logDir: String): Unit = {
def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
error(msg, e)
if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) {
logDirFailureEvent.add(logDir)
offlineLogDirQueue.add(logDir)
}
}
@ -48,8 +54,6 @@ class LogDirFailureChannel(logDirNum: Int) { @@ -48,8 +54,6 @@ class LogDirFailureChannel(logDirNum: Int) {
* Get the next offline log dir from logDirFailureEvent queue.
* The method will wait if necessary until a new offline log directory becomes available
*/
def takeNextLogFailureEvent(): String = {
logDirFailureEvent.take()
}
def takeNextOfflineLogDir(): String = offlineLogDirQueue.take()
}

2
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -189,7 +189,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -189,7 +189,7 @@ class ReplicaManager(val config: KafkaConfig,
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
override def doWork() {
val newOfflineLogDir = logDirFailureChannel.takeNextLogFailureEvent()
val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
if (haltBrokerOnDirFailure) {
fatal(s"Halting broker because dir $newOfflineLogDir is offline")
Exit.halt(1)

10
core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala

@ -72,8 +72,9 @@ class CheckpointFile[T](val file: File, @@ -72,8 +72,9 @@ class CheckpointFile[T](val file: File,
Utils.atomicMoveWithFallback(tempPath, path)
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddLogFailureEvent(logDir)
throw new KafkaStorageException(s"Error while writing to checkpoint file ${file.getAbsolutePath}", e)
val msg = s"Error while writing to checkpoint file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}
@ -119,8 +120,9 @@ class CheckpointFile[T](val file: File, @@ -119,8 +120,9 @@ class CheckpointFile[T](val file: File,
}
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddLogFailureEvent(logDir)
throw new KafkaStorageException(s"Error while reading checkpoint file ${file.getAbsolutePath}", e)
val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}

34
core/src/test/scala/integration/kafka/api/LogDirFailureTest.scala

@ -34,6 +34,9 @@ import org.junit.Assert.assertTrue @@ -34,6 +34,9 @@ import org.junit.Assert.assertTrue
* Test whether clients can producer and consume when there is log directory failure
*/
class LogDirFailureTest extends IntegrationTestHarness {
import kafka.api.LogDirFailureTest._
val producerCount: Int = 1
val consumerCount: Int = 1
val serverCount: Int = 2
@ -42,7 +45,8 @@ class LogDirFailureTest extends IntegrationTestHarness { @@ -42,7 +45,8 @@ class LogDirFailureTest extends IntegrationTestHarness {
this.logDirCount = 2
this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "100")
this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "60000")
@Before
override def setUp() {
@ -51,8 +55,16 @@ class LogDirFailureTest extends IntegrationTestHarness { @@ -51,8 +55,16 @@ class LogDirFailureTest extends IntegrationTestHarness {
}
@Test
def testProduceAfterLogDirFailure() {
def testIOExceptionDuringLogRoll() {
testProduceAfterLogDirFailure(Roll)
}
@Test
def testIOExceptionDuringCheckpoint() {
testProduceAfterLogDirFailure(Checkpoint)
}
def testProduceAfterLogDirFailure(failureType: LogDirFailureType) {
val consumer = consumers.head
subscribeAndWaitForAssignment(topic, consumer)
val producer = producers.head
@ -75,6 +87,17 @@ class LogDirFailureTest extends IntegrationTestHarness { @@ -75,6 +87,17 @@ class LogDirFailureTest extends IntegrationTestHarness {
logDir.createNewFile()
assertTrue(logDir.isFile)
if (failureType == Roll) {
try {
leaderServer.replicaManager.getLog(partition).get.roll()
fail("Log rolling should fail with KafkaStorageException")
} catch {
case e: KafkaStorageException => // This is expected
}
} else if (failureType == Checkpoint) {
leaderServer.replicaManager.checkpointHighWatermarks()
}
// Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the topic will be offline
TestUtils.waitUntilTrue(() => !leaderServer.logManager.liveLogDirs.contains(logDir), "Expected log directory offline", 3000L)
assertTrue(leaderServer.replicaManager.getReplica(partition).isEmpty)
@ -123,3 +146,10 @@ class LogDirFailureTest extends IntegrationTestHarness { @@ -123,3 +146,10 @@ class LogDirFailureTest extends IntegrationTestHarness {
}
}
object LogDirFailureTest {
sealed trait LogDirFailureType
case object Roll extends LogDirFailureType
case object Checkpoint extends LogDirFailureType
}

7
core/src/test/scala/other/kafka/StressTestLog.scala

@ -21,7 +21,7 @@ import java.util.Properties @@ -21,7 +21,7 @@ import java.util.Properties
import java.util.concurrent.atomic._
import kafka.log._
import kafka.server.BrokerTopicStats
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.record.FileRecords
@ -49,7 +49,10 @@ object StressTestLog { @@ -49,7 +49,10 @@ object StressTestLog {
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time,
brokerTopicStats = new BrokerTopicStats)
maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(10))
val writer = new WriterThread(log)
writer.start()
val reader = new ReaderThread(log)

5
core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala

@ -25,7 +25,7 @@ import java.util.{Properties, Random} @@ -25,7 +25,7 @@ import java.util.{Properties, Random}
import joptsimple._
import kafka.log._
import kafka.message._
import kafka.server.BrokerTopicStats
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
@ -207,7 +207,8 @@ object TestLinearWriteSpeed { @@ -207,7 +207,8 @@ object TestLinearWriteSpeed {
class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable {
Utils.delete(dir)
val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM)
val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM, 60 * 60 * 1000,
LogManager.ProducerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10))
def write(): Int = {
log.appendAsLeader(messages, leaderEpoch = 0)
messages.sizeInBytes

9
core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala

@ -18,8 +18,8 @@ package kafka.cluster @@ -18,8 +18,8 @@ package kafka.cluster
import java.util.Properties
import kafka.log.{Log, LogConfig}
import kafka.server.{BrokerTopicStats, LogOffsetMetadata}
import kafka.log.{Log, LogConfig, LogManager}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel, LogOffsetMetadata}
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.OffsetOutOfRangeException
@ -49,7 +49,10 @@ class ReplicaTest { @@ -49,7 +49,10 @@ class ReplicaTest {
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
time = time,
maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10))
replica = new Replica(brokerId = 0,
topicPartition = new TopicPartition("foo", 0),

5
core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala

@ -97,7 +97,10 @@ abstract class AbstractLogCleanerIntegrationTest { @@ -97,7 +97,10 @@ abstract class AbstractLogCleanerIntegrationTest {
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time,
brokerTopicStats = new BrokerTopicStats)
brokerTopicStats = new BrokerTopicStats,
maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10))
logMap.put(partition, log)
this.logs += log
}

6
core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala

@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec @@ -29,7 +29,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
import org.apache.kafka.common.utils.Utils
import java.util.{Collection, Properties}
import kafka.server.BrokerTopicStats
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import scala.collection.JavaConverters._
@ -56,7 +56,9 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin @@ -56,7 +56,9 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
/*configure broker-side compression */
val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats)
time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10))
/* append two messages */
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0,

11
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

@ -20,7 +20,7 @@ package kafka.log @@ -20,7 +20,7 @@ package kafka.log
import java.io.File
import java.util.Properties
import kafka.server.BrokerTopicStats
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record._
@ -236,13 +236,18 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { @@ -236,13 +236,18 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time,
brokerTopicStats = new BrokerTopicStats)
brokerTopicStats = new BrokerTopicStats,
maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10))
log
}
private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats)
time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10))
private def records(key: Int, value: Int, timestamp: Long) =
MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes))

6
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

@ -23,7 +23,7 @@ import java.nio.file.Paths @@ -23,7 +23,7 @@ import java.nio.file.Paths
import java.util.Properties
import kafka.common._
import kafka.server.BrokerTopicStats
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record._
@ -1238,7 +1238,9 @@ class LogCleanerTest extends JUnitSuite { @@ -1238,7 +1238,9 @@ class LogCleanerTest extends JUnitSuite {
private def makeLog(dir: File = dir, config: LogConfig = logConfig, recoveryPoint: Long = 0L) =
Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats)
time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10))
private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ }

794
core/src/test/scala/unit/kafka/log/LogTest.scala

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save