From 8134adcf91bd20d4f9def54c8b030ff392fad616 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Tue, 13 Jul 2021 17:06:18 -0700 Subject: [PATCH] KAFKA-13073: Fix MockLog snapshot implementation (#11032) Fix a simulation test failure by: 1. Relaxing the valiation of the snapshot id against the log start offset when the state machine attempts to create new snapshot. It is safe to just ignore the request instead of throwing an exception when the snapshot id is less that the log start offset. 2. Fixing the MockLog implementation so that it uses startOffset both externally and internally. Reviewers: Colin P. McCabe --- .../scala/kafka/raft/KafkaMetadataLog.scala | 13 ++--- .../kafka/raft/KafkaMetadataLogTest.scala | 5 +- raft/config/kraft-log4j.properties | 1 + .../org/apache/kafka/raft/ReplicatedLog.java | 8 +-- .../java/org/apache/kafka/raft/MockLog.java | 58 ++++++++++++------- .../org/apache/kafka/raft/MockLogTest.java | 8 +-- .../kafka/raft/RaftClientTestContext.java | 4 +- .../kafka/raft/RaftEventSimulationTest.java | 31 ++++++---- 8 files changed, 74 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 51e95d7ee3f..23c8f2450fb 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -235,16 +235,15 @@ final class KafkaMetadataLog private ( } override def createNewSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = { - val highWatermarkOffset = highWatermark.offset - if (snapshotId.offset > highWatermarkOffset) { - throw new IllegalArgumentException( - s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)" - ) + if (snapshotId.offset < startOffset) { + info(s"Cannot create a snapshot with an id ($snapshotId) less than the log start offset ($startOffset)") + return Optional.empty() } - if (snapshotId.offset < startOffset) { + val highWatermarkOffset = highWatermark.offset + if (snapshotId.offset > highWatermarkOffset) { throw new IllegalArgumentException( - s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)" + s"Cannot create a snapshot with an id ($snapshotId) greater than the high-watermark ($highWatermarkOffset)" ) } diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index 5fb4b1214ee..415b69434c1 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -193,10 +193,7 @@ final class KafkaMetadataLogTest { // Simulate log cleanup that advances the LSO log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, SegmentDeletion) - assertThrows( - classOf[IllegalArgumentException], - () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch)) - ) + assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch))) } @Test diff --git a/raft/config/kraft-log4j.properties b/raft/config/kraft-log4j.properties index 87f4fcc7502..14f739af05d 100644 --- a/raft/config/kraft-log4j.properties +++ b/raft/config/kraft-log4j.properties @@ -21,3 +21,4 @@ log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.stderr.Target=System.err log4j.logger.org.apache.kafka.raft=INFO +log4j.logger.org.apache.kafka.snapshot=INFO diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java index 1b49b3a6706..e8b2957aa02 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java @@ -236,17 +236,17 @@ public interface ReplicatedLog extends AutoCloseable { * * See {@link RawSnapshotWriter} for details on how to use this object. The caller of * this method is responsible for invoking {@link RawSnapshotWriter#close()}. If a - * snapshot already exists then return an {@link Optional#empty()}. + * snapshot already exists or it is less than log start offset then return an + * {@link Optional#empty()}. * * Snapshots created using this method will be validated against the existing snapshots * and the replicated log. * * @param snapshotId the end offset and epoch that identifies the snapshot - * @return a writable snapshot if it doesn't already exists + * @return a writable snapshot if it doesn't already exists and greater than the log start + * offset * @throws IllegalArgumentException if validate is true and end offset is greater than the * high-watermark - * @throws IllegalArgumentException if validate is true and end offset is less than the log - * start offset */ Optional createNewSnapshot(OffsetAndEpoch snapshotId); diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java index fb2168bd72d..726f184ad2b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java @@ -27,11 +27,13 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.snapshot.RawSnapshotReader; -import org.apache.kafka.snapshot.RawSnapshotWriter; import org.apache.kafka.snapshot.MockRawSnapshotReader; import org.apache.kafka.snapshot.MockRawSnapshotWriter; +import org.apache.kafka.snapshot.RawSnapshotReader; +import org.apache.kafka.snapshot.RawSnapshotWriter; +import org.slf4j.Logger; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -59,14 +61,20 @@ public class MockLog implements ReplicatedLog { private final NavigableMap snapshots = new TreeMap<>(); private final TopicPartition topicPartition; private final Uuid topicId; + private final Logger logger; private long nextId = ID_GENERATOR.getAndIncrement(); private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0, Optional.empty()); private long lastFlushedOffset = 0; - public MockLog(TopicPartition topicPartition, Uuid topicId) { + public MockLog( + TopicPartition topicPartition, + Uuid topicId, + LogContext logContext + ) { this.topicPartition = topicPartition; this.topicId = topicId; + this.logger = logContext.logger(MockLog.class); } @Override @@ -218,17 +226,25 @@ public class MockLog implements ReplicatedLog { @Override public LogOffsetMetadata endOffset() { - long nextOffset = lastEntry().map(entry -> entry.offset + 1).orElse(logStartOffset()); + long nextOffset = lastEntry() + .map(entry -> entry.offset + 1) + .orElse( + latestSnapshotId() + .map(id -> id.offset) + .orElse(0L) + ); return new LogOffsetMetadata(nextOffset, Optional.of(new MockOffsetMetadata(nextId))); } @Override public long startOffset() { - return firstEntry().map(entry -> entry.offset).orElse(logStartOffset()); - } - - private long logStartOffset() { - return earliestSnapshotId().map(id -> id.offset).orElse(0L); + return firstEntry() + .map(entry -> entry.offset) + .orElse( + earliestSnapshotId() + .map(id -> id.offset) + .orElse(0L) + ); } private List buildEntries(RecordBatch batch, Function offsetSupplier) { @@ -420,6 +436,16 @@ public class MockLog implements ReplicatedLog { @Override public Optional createNewSnapshot(OffsetAndEpoch snapshotId) { + if (snapshotId.offset < startOffset()) { + logger.info( + "Cannot create a snapshot with an id ({}) less than the log start offset ({})", + snapshotId, + startOffset() + ); + + return Optional.empty(); + } + long highWatermarkOffset = highWatermark().offset; if (snapshotId.offset > highWatermarkOffset) { throw new IllegalArgumentException( @@ -431,16 +457,6 @@ public class MockLog implements ReplicatedLog { ); } - if (snapshotId.offset < logStartOffset()) { - throw new IllegalArgumentException( - String.format( - "Cannot create a snapshot with and id (%s) less than the log start offset (%s)", - snapshotId, - logStartOffset() - ) - ); - } - ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset, snapshotId.epoch); if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) { throw new IllegalArgumentException( @@ -490,12 +506,12 @@ public class MockLog implements ReplicatedLog { @Override public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { - if (logStartOffset() > snapshotId.offset) { + if (startOffset() > snapshotId.offset) { throw new OffsetOutOfRangeException( String.format( "New log start (%s) is less than the curent log start offset (%s)", snapshotId, - logStartOffset() + startOffset() ) ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java index 404c8d46b31..1b2caca0fac 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; @@ -57,7 +58,7 @@ public class MockLogTest { @BeforeEach public void setup() { - log = new MockLog(topicPartition, topicId); + log = new MockLog(topicPartition, topicId, new LogContext()); } @AfterEach @@ -510,10 +511,7 @@ public class MockLogTest { assertTrue(log.deleteBeforeSnapshot(snapshotId)); assertEquals(snapshotId.offset, log.startOffset()); - assertThrows( - IllegalArgumentException.class, - () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch)) - ); + assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch))); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index ffd87c9050f..ef5b724f160 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -129,7 +129,8 @@ public final class RaftClientTestContext { private final MockTime time = new MockTime(); private final QuorumStateStore quorumStateStore = new MockQuorumStateStore(); private final Random random = Mockito.spy(new Random(1)); - private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID); + private final LogContext logContext = new LogContext(); + private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext); private final Set voters; private final OptionalInt localId; @@ -223,7 +224,6 @@ public final class RaftClientTestContext { public RaftClientTestContext build() throws IOException { Metrics metrics = new Metrics(time); MockNetworkChannel channel = new MockNetworkChannel(voters); - LogContext logContext = new LogContext(); MockListener listener = new MockListener(localId); Map voterAddressMap = voters.stream() .collect(Collectors.toMap(id -> id, RaftClientTestContext::mockAddress)); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 739785a5f1f..d69fcc110ee 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.raft; +import net.jqwik.api.AfterFailureMode; import net.jqwik.api.ForAll; import net.jqwik.api.Property; import net.jqwik.api.Tag; @@ -103,7 +104,7 @@ public class RaftEventSimulationTest { private static final int FETCH_MAX_WAIT_MS = 100; private static final int LINGER_MS = 0; - @Property(tries = 100) + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) void canElectInitialLeader( @ForAll int seed, @ForAll @IntRange(min = 1, max = 5) int numVoters, @@ -122,7 +123,7 @@ public class RaftEventSimulationTest { scheduler.runUntil(() -> cluster.allReachedHighWatermark(10)); } - @Property(tries = 100) + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) void canElectNewLeaderAfterOldLeaderFailure( @ForAll int seed, @ForAll @IntRange(min = 3, max = 5) int numVoters, @@ -162,7 +163,7 @@ public class RaftEventSimulationTest { scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10)); } - @Property(tries = 100) + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) void canRecoverAfterAllNodesKilled( @ForAll int seed, @ForAll @IntRange(min = 1, max = 5) int numVoters, @@ -195,7 +196,7 @@ public class RaftEventSimulationTest { scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10)); } - @Property(tries = 100) + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) void canElectNewLeaderAfterOldLeaderPartitionedAway( @ForAll int seed, @ForAll @IntRange(min = 3, max = 5) int numVoters, @@ -227,7 +228,7 @@ public class RaftEventSimulationTest { scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, nonPartitionedNodes)); } - @Property(tries = 100) + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) void canMakeProgressIfMajorityIsReachable( @ForAll int seed, @ForAll @IntRange(min = 0, max = 3) int numObservers @@ -272,7 +273,7 @@ public class RaftEventSimulationTest { scheduler.runUntil(() -> cluster.allReachedHighWatermark(30)); } - @Property(tries = 100) + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) void canMakeProgressAfterBackToBackLeaderFailures( @ForAll int seed, @ForAll @IntRange(min = 3, max = 5) int numVoters, @@ -305,7 +306,7 @@ public class RaftEventSimulationTest { scheduler.runUntil(() -> cluster.anyReachedHighWatermark(targetHighWatermark)); } - @Property(tries = 100) + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) void canRecoverFromSingleNodeCommittedDataLoss( @ForAll int seed, @ForAll @IntRange(min = 3, max = 5) int numVoters, @@ -498,7 +499,15 @@ public class RaftEventSimulationTest { private static class PersistentState { final MockQuorumStateStore store = new MockQuorumStateStore(); - final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID); + final MockLog log; + + PersistentState(int nodeId) { + log = new MockLog( + METADATA_PARTITION, + Uuid.METADATA_TOPIC_ID, + new LogContext(String.format("[Node %s] ", nodeId)) + ); + } } private static class Cluster { @@ -516,11 +525,11 @@ public class RaftEventSimulationTest { int nodeId = 0; for (; nodeId < numVoters; nodeId++) { voters.add(nodeId); - nodes.put(nodeId, new PersistentState()); + nodes.put(nodeId, new PersistentState(nodeId)); } for (; nodeId < numVoters + numObservers; nodeId++) { - nodes.put(nodeId, new PersistentState()); + nodes.put(nodeId, new PersistentState(nodeId)); } } @@ -674,7 +683,7 @@ public class RaftEventSimulationTest { void killAndDeletePersistentState(int nodeId) { kill(nodeId); - nodes.put(nodeId, new PersistentState()); + nodes.put(nodeId, new PersistentState(nodeId)); } private static RaftConfig.AddressSpec nodeAddress(int id) {