Browse Source

KAFKA-13073: Fix MockLog snapshot implementation (#11032)

Fix a simulation test failure by:

1. Relaxing the valiation of the snapshot id against the log start
offset when the state machine attempts to create new snapshot. It
is safe to just ignore the request instead of throwing an exception
when the snapshot id is less that the log start offset.

2. Fixing the MockLog implementation so that it uses startOffset both
externally and internally.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
pull/11043/head
José Armando García Sancio 3 years ago committed by GitHub
parent
commit
8134adcf91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
  2. 5
      core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
  3. 1
      raft/config/kraft-log4j.properties
  4. 8
      raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
  5. 58
      raft/src/test/java/org/apache/kafka/raft/MockLog.java
  6. 8
      raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
  7. 4
      raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
  8. 31
      raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java

13
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala

@ -235,16 +235,15 @@ final class KafkaMetadataLog private ( @@ -235,16 +235,15 @@ final class KafkaMetadataLog private (
}
override def createNewSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = {
val highWatermarkOffset = highWatermark.offset
if (snapshotId.offset > highWatermarkOffset) {
throw new IllegalArgumentException(
s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)"
)
if (snapshotId.offset < startOffset) {
info(s"Cannot create a snapshot with an id ($snapshotId) less than the log start offset ($startOffset)")
return Optional.empty()
}
if (snapshotId.offset < startOffset) {
val highWatermarkOffset = highWatermark.offset
if (snapshotId.offset > highWatermarkOffset) {
throw new IllegalArgumentException(
s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)"
s"Cannot create a snapshot with an id ($snapshotId) greater than the high-watermark ($highWatermarkOffset)"
)
}

5
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala

@ -193,10 +193,7 @@ final class KafkaMetadataLogTest { @@ -193,10 +193,7 @@ final class KafkaMetadataLogTest {
// Simulate log cleanup that advances the LSO
log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, SegmentDeletion)
assertThrows(
classOf[IllegalArgumentException],
() => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch))
)
assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch)))
}
@Test

1
raft/config/kraft-log4j.properties

@ -21,3 +21,4 @@ log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n @@ -21,3 +21,4 @@ log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stderr.Target=System.err
log4j.logger.org.apache.kafka.raft=INFO
log4j.logger.org.apache.kafka.snapshot=INFO

8
raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java

@ -236,17 +236,17 @@ public interface ReplicatedLog extends AutoCloseable { @@ -236,17 +236,17 @@ public interface ReplicatedLog extends AutoCloseable {
*
* See {@link RawSnapshotWriter} for details on how to use this object. The caller of
* this method is responsible for invoking {@link RawSnapshotWriter#close()}. If a
* snapshot already exists then return an {@link Optional#empty()}.
* snapshot already exists or it is less than log start offset then return an
* {@link Optional#empty()}.
*
* Snapshots created using this method will be validated against the existing snapshots
* and the replicated log.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
* @return a writable snapshot if it doesn't already exists
* @return a writable snapshot if it doesn't already exists and greater than the log start
* offset
* @throws IllegalArgumentException if validate is true and end offset is greater than the
* high-watermark
* @throws IllegalArgumentException if validate is true and end offset is less than the log
* start offset
*/
Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId);

58
raft/src/test/java/org/apache/kafka/raft/MockLog.java

@ -27,11 +27,13 @@ import org.apache.kafka.common.record.RecordBatch; @@ -27,11 +27,13 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -59,14 +61,20 @@ public class MockLog implements ReplicatedLog { @@ -59,14 +61,20 @@ public class MockLog implements ReplicatedLog {
private final NavigableMap<OffsetAndEpoch, MockRawSnapshotReader> snapshots = new TreeMap<>();
private final TopicPartition topicPartition;
private final Uuid topicId;
private final Logger logger;
private long nextId = ID_GENERATOR.getAndIncrement();
private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0, Optional.empty());
private long lastFlushedOffset = 0;
public MockLog(TopicPartition topicPartition, Uuid topicId) {
public MockLog(
TopicPartition topicPartition,
Uuid topicId,
LogContext logContext
) {
this.topicPartition = topicPartition;
this.topicId = topicId;
this.logger = logContext.logger(MockLog.class);
}
@Override
@ -218,17 +226,25 @@ public class MockLog implements ReplicatedLog { @@ -218,17 +226,25 @@ public class MockLog implements ReplicatedLog {
@Override
public LogOffsetMetadata endOffset() {
long nextOffset = lastEntry().map(entry -> entry.offset + 1).orElse(logStartOffset());
long nextOffset = lastEntry()
.map(entry -> entry.offset + 1)
.orElse(
latestSnapshotId()
.map(id -> id.offset)
.orElse(0L)
);
return new LogOffsetMetadata(nextOffset, Optional.of(new MockOffsetMetadata(nextId)));
}
@Override
public long startOffset() {
return firstEntry().map(entry -> entry.offset).orElse(logStartOffset());
}
private long logStartOffset() {
return earliestSnapshotId().map(id -> id.offset).orElse(0L);
return firstEntry()
.map(entry -> entry.offset)
.orElse(
earliestSnapshotId()
.map(id -> id.offset)
.orElse(0L)
);
}
private List<LogEntry> buildEntries(RecordBatch batch, Function<Record, Long> offsetSupplier) {
@ -420,6 +436,16 @@ public class MockLog implements ReplicatedLog { @@ -420,6 +436,16 @@ public class MockLog implements ReplicatedLog {
@Override
public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId) {
if (snapshotId.offset < startOffset()) {
logger.info(
"Cannot create a snapshot with an id ({}) less than the log start offset ({})",
snapshotId,
startOffset()
);
return Optional.empty();
}
long highWatermarkOffset = highWatermark().offset;
if (snapshotId.offset > highWatermarkOffset) {
throw new IllegalArgumentException(
@ -431,16 +457,6 @@ public class MockLog implements ReplicatedLog { @@ -431,16 +457,6 @@ public class MockLog implements ReplicatedLog {
);
}
if (snapshotId.offset < logStartOffset()) {
throw new IllegalArgumentException(
String.format(
"Cannot create a snapshot with and id (%s) less than the log start offset (%s)",
snapshotId,
logStartOffset()
)
);
}
ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset, snapshotId.epoch);
if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) {
throw new IllegalArgumentException(
@ -490,12 +506,12 @@ public class MockLog implements ReplicatedLog { @@ -490,12 +506,12 @@ public class MockLog implements ReplicatedLog {
@Override
public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
if (logStartOffset() > snapshotId.offset) {
if (startOffset() > snapshotId.offset) {
throw new OffsetOutOfRangeException(
String.format(
"New log start (%s) is less than the curent log start offset (%s)",
snapshotId,
logStartOffset()
startOffset()
)
);
}

8
raft/src/test/java/org/apache/kafka/raft/MockLogTest.java

@ -27,6 +27,7 @@ import org.apache.kafka.common.record.Record; @@ -27,6 +27,7 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
@ -57,7 +58,7 @@ public class MockLogTest { @@ -57,7 +58,7 @@ public class MockLogTest {
@BeforeEach
public void setup() {
log = new MockLog(topicPartition, topicId);
log = new MockLog(topicPartition, topicId, new LogContext());
}
@AfterEach
@ -510,10 +511,7 @@ public class MockLogTest { @@ -510,10 +511,7 @@ public class MockLogTest {
assertTrue(log.deleteBeforeSnapshot(snapshotId));
assertEquals(snapshotId.offset, log.startOffset());
assertThrows(
IllegalArgumentException.class,
() -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch))
);
assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch)));
}
@Test

4
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java

@ -129,7 +129,8 @@ public final class RaftClientTestContext { @@ -129,7 +129,8 @@ public final class RaftClientTestContext {
private final MockTime time = new MockTime();
private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
private final Random random = Mockito.spy(new Random(1));
private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID);
private final LogContext logContext = new LogContext();
private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext);
private final Set<Integer> voters;
private final OptionalInt localId;
@ -223,7 +224,6 @@ public final class RaftClientTestContext { @@ -223,7 +224,6 @@ public final class RaftClientTestContext {
public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel(voters);
LogContext logContext = new LogContext();
MockListener listener = new MockListener(localId);
Map<Integer, RaftConfig.AddressSpec> voterAddressMap = voters.stream()
.collect(Collectors.toMap(id -> id, RaftClientTestContext::mockAddress));

31
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.raft;
import net.jqwik.api.AfterFailureMode;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import net.jqwik.api.Tag;
@ -103,7 +104,7 @@ public class RaftEventSimulationTest { @@ -103,7 +104,7 @@ public class RaftEventSimulationTest {
private static final int FETCH_MAX_WAIT_MS = 100;
private static final int LINGER_MS = 0;
@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectInitialLeader(
@ForAll int seed,
@ForAll @IntRange(min = 1, max = 5) int numVoters,
@ -122,7 +123,7 @@ public class RaftEventSimulationTest { @@ -122,7 +123,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
}
@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectNewLeaderAfterOldLeaderFailure(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@ -162,7 +163,7 @@ public class RaftEventSimulationTest { @@ -162,7 +163,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
}
@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canRecoverAfterAllNodesKilled(
@ForAll int seed,
@ForAll @IntRange(min = 1, max = 5) int numVoters,
@ -195,7 +196,7 @@ public class RaftEventSimulationTest { @@ -195,7 +196,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
}
@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectNewLeaderAfterOldLeaderPartitionedAway(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@ -227,7 +228,7 @@ public class RaftEventSimulationTest { @@ -227,7 +228,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, nonPartitionedNodes));
}
@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canMakeProgressIfMajorityIsReachable(
@ForAll int seed,
@ForAll @IntRange(min = 0, max = 3) int numObservers
@ -272,7 +273,7 @@ public class RaftEventSimulationTest { @@ -272,7 +273,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.allReachedHighWatermark(30));
}
@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canMakeProgressAfterBackToBackLeaderFailures(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@ -305,7 +306,7 @@ public class RaftEventSimulationTest { @@ -305,7 +306,7 @@ public class RaftEventSimulationTest {
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(targetHighWatermark));
}
@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canRecoverFromSingleNodeCommittedDataLoss(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@ -498,7 +499,15 @@ public class RaftEventSimulationTest { @@ -498,7 +499,15 @@ public class RaftEventSimulationTest {
private static class PersistentState {
final MockQuorumStateStore store = new MockQuorumStateStore();
final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID);
final MockLog log;
PersistentState(int nodeId) {
log = new MockLog(
METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID,
new LogContext(String.format("[Node %s] ", nodeId))
);
}
}
private static class Cluster {
@ -516,11 +525,11 @@ public class RaftEventSimulationTest { @@ -516,11 +525,11 @@ public class RaftEventSimulationTest {
int nodeId = 0;
for (; nodeId < numVoters; nodeId++) {
voters.add(nodeId);
nodes.put(nodeId, new PersistentState());
nodes.put(nodeId, new PersistentState(nodeId));
}
for (; nodeId < numVoters + numObservers; nodeId++) {
nodes.put(nodeId, new PersistentState());
nodes.put(nodeId, new PersistentState(nodeId));
}
}
@ -674,7 +683,7 @@ public class RaftEventSimulationTest { @@ -674,7 +683,7 @@ public class RaftEventSimulationTest {
void killAndDeletePersistentState(int nodeId) {
kill(nodeId);
nodes.put(nodeId, new PersistentState());
nodes.put(nodeId, new PersistentState(nodeId));
}
private static RaftConfig.AddressSpec nodeAddress(int id) {

Loading…
Cancel
Save