Browse Source

MINOR: Log more information when producer snapshot is written (#10757)

This patch logs more information when a producer snapshot is written to the disk.

Reviewers: Ismael Juma <mlists@juma.me.uk>, Lucas Bradstreet <lucas@confluent.io>
pull/10767/head
David Jacot 4 years ago committed by GitHub
parent
commit
38e8391a77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      core/src/main/scala/kafka/log/Log.scala
  2. 6
      core/src/main/scala/kafka/log/LogLoader.scala
  3. 11
      core/src/main/scala/kafka/log/ProducerStateManager.scala
  4. 2
      core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
  5. 2
      core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
  6. 4
      core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
  7. 18
      core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
  8. 2
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  9. 2
      core/src/test/scala/unit/kafka/utils/SchedulerTest.scala

2
core/src/main/scala/kafka/log/Log.scala

@ -2011,7 +2011,7 @@ object Log extends Logging { @@ -2011,7 +2011,7 @@ object Log extends Logging {
logDirFailureChannel,
config.messageFormatVersion.recordVersion,
s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs, time)
val offsets = LogLoader.load(LoadLogParams(
dir,
topicPartition,

6
core/src/main/scala/kafka/log/LogLoader.scala

@ -324,7 +324,11 @@ object LogLoader extends Logging { @@ -324,7 +324,11 @@ object LogLoader extends Logging {
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
*/
private def recoverSegment(segment: LogSegment, params: LoadLogParams): Int = {
val producerStateManager = new ProducerStateManager(params.topicPartition, params.dir, params.maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(
params.topicPartition,
params.dir,
params.maxProducerIdExpirationMs,
params.time)
Log.rebuildProducerState(
producerStateManager,
params.segments,

11
core/src/main/scala/kafka/log/ProducerStateManager.scala

@ -21,7 +21,6 @@ import java.nio.ByteBuffer @@ -21,7 +21,6 @@ import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Files, StandardOpenOption}
import java.util.concurrent.ConcurrentSkipListMap
import kafka.log.Log.offsetFromFile
import kafka.server.LogOffsetMetadata
import kafka.utils.{Logging, nonthreadsafe, threadsafe}
@ -29,6 +28,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -29,6 +28,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
import scala.jdk.CollectionConverters._
@ -484,7 +484,8 @@ object ProducerStateManager { @@ -484,7 +484,8 @@ object ProducerStateManager {
@nonthreadsafe
class ProducerStateManager(val topicPartition: TopicPartition,
@volatile var _logDir: File,
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000) extends Logging {
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
val time: Time = Time.SYSTEM) extends Logging {
import ProducerStateManager._
import java.util
@ -718,8 +719,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -718,8 +719,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
// If not a new offset, then it is not worth taking another snapshot
if (lastMapOffset > lastSnapOffset) {
val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, lastMapOffset))
info(s"Writing producer snapshot at offset $lastMapOffset")
val start = time.hiResClockMs()
writeSnapshot(snapshotFile.file, producers)
info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.")
snapshots.put(snapshotFile.offset, snapshotFile)
// Update the last snap offset according to the serialized map
@ -730,7 +733,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -730,7 +733,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
/**
* Update the parentDir for this ProducerStateManager and all of the snapshot files which it manages.
*/
def updateParentDir(parentDir: File): Unit ={
def updateParentDir(parentDir: File): Unit = {
_logDir = parentDir
snapshots.forEach((_, s) => s.updateParentDir(parentDir))
}

2
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

@ -101,7 +101,7 @@ class LogCleanerManagerTest extends Logging { @@ -101,7 +101,7 @@ class LogCleanerManagerTest extends Logging {
val maxProducerIdExpirationMs = 60 * 60 * 1000
val segments = new LogSegments(tp)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs, time)
val offsets = LogLoader.load(LoadLogParams(
tpDir,
tp,

2
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

@ -106,7 +106,7 @@ class LogCleanerTest { @@ -106,7 +106,7 @@ class LogCleanerTest {
val maxProducerIdExpirationMs = 60 * 60 * 1000
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs, time)
val offsets = LogLoader.load(LoadLogParams(
dir,
topicPartition,

4
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala

@ -95,7 +95,7 @@ class LogLoaderTest { @@ -95,7 +95,7 @@ class LogLoaderTest {
val maxProducerIdExpirationMs = 60 * 60 * 1000
val segments = new LogSegments(topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, time)
val loadLogParams = LoadLogParams(logDir, topicPartition, config, time.scheduler, time,
logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
maxProducerIdExpirationMs, leaderEpochCache, producerStateManager)
@ -265,7 +265,7 @@ class LogLoaderTest { @@ -265,7 +265,7 @@ class LogLoaderTest {
}
}
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, mockTime)
val loadLogParams = LoadLogParams(
logDir,
topicPartition,

18
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

@ -46,7 +46,7 @@ class ProducerStateManagerTest { @@ -46,7 +46,7 @@ class ProducerStateManagerTest {
@BeforeEach
def setUp(): Unit = {
logDir = TestUtils.tempDir()
stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
}
@AfterEach
@ -467,7 +467,7 @@ class ProducerStateManagerTest { @@ -467,7 +467,7 @@ class ProducerStateManagerTest {
append(stateManager, producerId, epoch, 1, 1L, isTransactional = true)
stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)
// The snapshot only persists the last appended batch metadata
@ -490,7 +490,7 @@ class ProducerStateManagerTest { @@ -490,7 +490,7 @@ class ProducerStateManagerTest {
appendEndTxnMarker(stateManager, producerId, epoch, ControlRecordType.ABORT, offset = 2L)
stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds)
// The snapshot only persists the last appended batch metadata
@ -510,7 +510,7 @@ class ProducerStateManagerTest { @@ -510,7 +510,7 @@ class ProducerStateManagerTest {
offset = 0L, timestamp = appendTimestamp)
stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(logStartOffset = 0L, logEndOffset = 1L, time.milliseconds)
val lastEntry = recoveredMapping.lastEntry(producerId)
@ -542,7 +542,7 @@ class ProducerStateManagerTest { @@ -542,7 +542,7 @@ class ProducerStateManagerTest {
append(stateManager, producerId, epoch, 1, 1L, 1)
stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 1L, 70000)
// entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence
@ -561,7 +561,7 @@ class ProducerStateManagerTest { @@ -561,7 +561,7 @@ class ProducerStateManagerTest {
append(stateManager, producerId, epoch, 1, 1L, 1)
stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 1L, 70000)
val sequence = 2
@ -769,7 +769,7 @@ class ProducerStateManagerTest { @@ -769,7 +769,7 @@ class ProducerStateManagerTest {
@Test
def testSequenceNotValidatedForGroupMetadataTopic(): Unit = {
val partition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val stateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
val epoch = 0.toShort
append(stateManager, producerId, epoch, RecordBatch.NO_SEQUENCE, offset = 99,
@ -818,7 +818,7 @@ class ProducerStateManagerTest { @@ -818,7 +818,7 @@ class ProducerStateManagerTest {
appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1)
stateManager.takeSnapshot()
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
recoveredMapping.truncateAndReload(0L, 2L, 70000)
// append from old coordinator should be rejected
@ -922,7 +922,7 @@ class ProducerStateManagerTest { @@ -922,7 +922,7 @@ class ProducerStateManagerTest {
}
// Ensure that the truncated snapshot is deleted and producer state is loaded from the previous snapshot
val reloadedStateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
val reloadedStateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs, time)
reloadedStateManager.truncateAndReload(0L, 20L, time.milliseconds())
assertFalse(snapshotToTruncate.exists())

2
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -1487,7 +1487,7 @@ class ReplicaManagerTest { @@ -1487,7 +1487,7 @@ class ReplicaManagerTest {
val maxProducerIdExpirationMs = 30000
val segments = new LogSegments(tp)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs, time)
val offsets = LogLoader.load(LoadLogParams(
logDir,
tp,

2
core/src/test/scala/unit/kafka/utils/SchedulerTest.scala

@ -124,7 +124,7 @@ class SchedulerTest { @@ -124,7 +124,7 @@ class SchedulerTest {
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, mockTime)
val offsets = LogLoader.load(LoadLogParams(
logDir,
topicPartition,

Loading…
Cancel
Save