Browse Source

KAFKA-14073; Log the reason for snapshot (#12414)

When a snapshot is taken it is due to either of the following reasons -

    Max bytes were applied
    Metadata version was changed

Once the snapshot process is started, it will log the reason that initiated the process.

Updated existing tests to include code changes required to log the reason. I was not able to check the logs when running tests - could someone guide me on how to enable logs when running a specific test case.

Reviewers: dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
pull/12633/head
Ashmeet Lamba 2 years ago committed by GitHub
parent
commit
86645cb40a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
  2. 6
      core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
  3. 4
      core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
  4. 3
      core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
  5. 27
      core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
  6. 7
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  7. 34
      metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java
  8. 2
      raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java

28
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala

@ -22,6 +22,7 @@ import java.util.function.Consumer @@ -22,6 +22,7 @@ import java.util.function.Consumer
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
import org.apache.kafka.server.common.ApiMessageAndVersion
@ -141,16 +142,29 @@ class BrokerMetadataListener( @@ -141,16 +142,29 @@ class BrokerMetadataListener(
}
_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
if (shouldSnapshot()) {
maybeStartSnapshot()
val shouldTakeSnapshot: Set[SnapshotReason] = shouldSnapshot()
if (shouldTakeSnapshot.nonEmpty) {
maybeStartSnapshot(shouldTakeSnapshot)
}
_publisher.foreach(publish)
}
}
private def shouldSnapshot(): Boolean = {
(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged()
private def shouldSnapshot(): Set[SnapshotReason] = {
val metadataVersionHasChanged = metadataVersionChanged()
val maxBytesHaveExceeded = (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots)
if (maxBytesHaveExceeded && metadataVersionHasChanged) {
Set(SnapshotReason.MetadataVersionChanged, SnapshotReason.MaxBytesExceeded)
} else if (maxBytesHaveExceeded) {
Set(SnapshotReason.MaxBytesExceeded)
} else if (metadataVersionHasChanged) {
Set(SnapshotReason.MetadataVersionChanged)
} else {
Set()
}
}
private def metadataVersionChanged(): Boolean = {
@ -161,11 +175,11 @@ class BrokerMetadataListener( @@ -161,11 +175,11 @@ class BrokerMetadataListener(
}
}
private def maybeStartSnapshot(): Unit = {
private def maybeStartSnapshot(reason: Set[SnapshotReason]): Unit = {
snapshotter.foreach { snapshotter =>
if (metadataFaultOccurred.get()) {
trace("Not starting metadata snapshot since we previously had an error")
} else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
} else if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply(), reason)) {
_bytesSinceLastSnapshot = 0L
}
}
@ -293,7 +307,7 @@ class BrokerMetadataListener( @@ -293,7 +307,7 @@ class BrokerMetadataListener(
log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.")
try {
if (metadataVersionChanged()) {
maybeStartSnapshot()
maybeStartSnapshot(Set(SnapshotReason.MetadataVersionChanged))
}
publish(publisher)
future.complete(null)

6
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala

@ -20,6 +20,7 @@ import java.util.concurrent.RejectedExecutionException @@ -20,6 +20,7 @@ import java.util.concurrent.RejectedExecutionException
import kafka.utils.Logging
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.SnapshotWriter
@ -82,7 +83,7 @@ class BrokerMetadataSnapshotter( @@ -82,7 +83,7 @@ class BrokerMetadataSnapshotter(
*/
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))
override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized {
override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage, snapshotReasons: Set[SnapshotReason]): Boolean = synchronized {
if (_currentSnapshotOffset != -1) {
info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
@ -95,7 +96,8 @@ class BrokerMetadataSnapshotter( @@ -95,7 +96,8 @@ class BrokerMetadataSnapshotter(
)
if (writer.nonEmpty) {
_currentSnapshotOffset = image.highestOffsetAndEpoch().offset
info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...")
info(s"Creating a new snapshot at offset ${_currentSnapshotOffset} because, ${snapshotReasons.mkString(" and ")}")
eventQueue.append(new CreateSnapshotEvent(image, writer.get))
true
} else {

4
core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package kafka.server.metadata
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.metadata.util.SnapshotReason
/**
@ -28,8 +29,9 @@ trait MetadataSnapshotter { @@ -28,8 +29,9 @@ trait MetadataSnapshotter {
*
* @param lastContainedLogTime The highest time contained in the snapshot.
* @param image The metadata image to write out.
* @param reason Set of reasons due to which a snapshot is being taken.
*
* @return True if we will write out a new snapshot; false otherwise.
*/
def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean
def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage, reason: Set[SnapshotReason]): Boolean
}

3
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala

@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.Metrics @@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
@ -137,7 +138,7 @@ class BrokerMetadataListenerTest { @@ -137,7 +138,7 @@ class BrokerMetadataListenerTest {
var prevCommittedEpoch = -1
var prevLastContainedLogTime = -1L
override def maybeStartSnapshot(lastContainedLogTime: Long, newImage: MetadataImage): Boolean = {
override def maybeStartSnapshot(lastContainedLogTime: Long, newImage: MetadataImage, reason: Set[SnapshotReason]): Boolean = {
try {
if (activeSnapshotOffset == -1L) {
assertTrue(prevCommittedOffset <= newImage.highestOffsetAndEpoch().offset)

27
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala

@ -27,6 +27,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords} @@ -27,6 +27,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.EventQueue
import org.apache.kafka.raft.OffsetAndEpoch
import org.apache.kafka.server.common.ApiMessageAndVersion
@ -96,11 +97,33 @@ class BrokerMetadataSnapshotterTest { @@ -96,11 +97,33 @@ class BrokerMetadataSnapshotterTest {
def testCreateSnapshot(): Unit = {
val writerBuilder = new MockSnapshotWriterBuilder()
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
try {
val blockingEvent = new BlockingEvent()
val reasons = Set(SnapshotReason.UnknownReason)
snapshotter.eventQueue.append(blockingEvent)
assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1, reasons))
assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2, reasons))
blockingEvent.latch.countDown()
assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
} finally {
snapshotter.close()
}
}
@Test
def testCreateSnapshotMultipleReasons(): Unit = {
val writerBuilder = new MockSnapshotWriterBuilder()
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)
try {
val blockingEvent = new BlockingEvent()
val reasons = Set(SnapshotReason.MaxBytesExceeded, SnapshotReason.MetadataVersionChanged)
snapshotter.eventQueue.append(blockingEvent)
assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1))
assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2))
assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1, reasons))
assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2, reasons))
blockingEvent.latch.countDown()
assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
} finally {

7
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -89,6 +89,7 @@ import org.apache.kafka.raft.BatchReader; @@ -89,6 +89,7 @@ import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.metadata.util.SnapshotReason;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -1465,8 +1466,8 @@ public final class QuorumController implements Controller { @@ -1465,8 +1466,8 @@ public final class QuorumController implements Controller {
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
}
log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot.",
lastCommittedEpoch, lastCommittedOffset, newBytesSinceLastSnapshot);
log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
lastCommittedEpoch, lastCommittedOffset, newBytesSinceLastSnapshot, SnapshotReason.MaxBytesExceeded);
snapshotGeneratorManager.createSnapshotGenerator(lastCommittedOffset, lastCommittedEpoch, lastCommittedTimestamp);
newBytesSinceLastSnapshot = 0;
@ -2116,6 +2117,8 @@ public final class QuorumController implements Controller { @@ -2116,6 +2117,8 @@ public final class QuorumController implements Controller {
CompletableFuture<Long> future = new CompletableFuture<>();
appendControlEvent("beginWritingSnapshot", () -> {
if (snapshotGeneratorManager.generator == null) {
log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot because, {}.",
lastCommittedEpoch, lastCommittedOffset, newBytesSinceLastSnapshot, SnapshotReason.UnknownReason);
snapshotGeneratorManager.createSnapshotGenerator(
lastCommittedOffset,
lastCommittedEpoch,

34
metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotReason.java

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.util;
public enum SnapshotReason {
UnknownReason("unknown reason"),
MaxBytesExceeded("max bytes were exceeded"),
MetadataVersionChanged("metadata version was changed");
private final String snapshotReason;
SnapshotReason(String snapshotReason) {
this.snapshotReason = snapshotReason;
}
@Override
public String toString() {
return snapshotReason;
}
}

2
raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java

@ -109,7 +109,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> { @@ -109,7 +109,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
if (lastOffsetSnapshotted + snapshotDelayInRecords < lastCommittedOffset) {
log.debug(
"Generating new snapshot with committed offset {} and epoch {} since the previoud snapshot includes {}",
"Generating new snapshot with committed offset {} and epoch {} since the previous snapshot includes {}",
lastCommittedOffset,
lastCommittedEpoch,
lastOffsetSnapshotted

Loading…
Cancel
Save