@ -42,11 +42,13 @@ import java.nio.ByteBuffer;
@@ -42,11 +42,13 @@ import java.nio.ByteBuffer;
import java.util.Arrays ;
import java.util.List ;
import java.util.Optional ;
import java.util.OptionalLong ;
import java.util.OptionalInt ;
import java.util.Set ;
import static org.junit.jupiter.api.Assertions.assertEquals ;
import static org.junit.jupiter.api.Assertions.assertTrue ;
import static org.junit.jupiter.api.Assertions.assertThrows ;
final public class KafkaRaftClientSnapshotTest {
@Test
@ -237,10 +239,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -237,10 +239,7 @@ final public class KafkaRaftClientSnapshotTest {
) ;
// Advance the highWatermark
context . deliverRequest ( context . fetchRequest ( epoch , otherNodeId , localLogEndOffset , epoch , 0 ) ) ;
context . pollUntilResponse ( ) ;
context . assertSentFetchPartitionResponse ( Errors . NONE , epoch , OptionalInt . of ( localId ) ) ;
assertEquals ( localLogEndOffset , context . client . highWatermark ( ) . getAsLong ( ) ) ;
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
OffsetAndEpoch snapshotId = new OffsetAndEpoch ( localLogEndOffset , epoch ) ;
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset - 1 , snapshotId . epoch , 0 ) . get ( ) ) {
@ -280,11 +279,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -280,11 +279,7 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals ( oldestSnapshotId . epoch + 1 , epoch ) ;
// Advance the highWatermark
long localLogEndOffset = context . log . endOffset ( ) . offset ;
context . deliverRequest ( context . fetchRequest ( epoch , otherNodeId , localLogEndOffset , epoch , 0 ) ) ;
context . pollUntilResponse ( ) ;
context . assertSentFetchPartitionResponse ( Errors . NONE , epoch , OptionalInt . of ( localId ) ) ;
assertEquals ( localLogEndOffset , context . client . highWatermark ( ) . getAsLong ( ) ) ;
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
@ -323,12 +318,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -323,12 +318,7 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals ( oldestSnapshotId . epoch + 2 + 1 , epoch ) ;
// Advance the highWatermark
context . deliverRequest (
context . fetchRequest ( epoch , syncNodeId , context . log . endOffset ( ) . offset , epoch , 0 )
) ;
context . pollUntilResponse ( ) ;
context . assertSentFetchPartitionResponse ( Errors . NONE , epoch , OptionalInt . of ( localId ) ) ;
assertEquals ( context . log . endOffset ( ) . offset , context . client . highWatermark ( ) . getAsLong ( ) ) ;
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
@ -371,12 +361,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -371,12 +361,7 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals ( oldestSnapshotId . epoch + 2 + 1 , epoch ) ;
// Advance the highWatermark
context . deliverRequest (
context . fetchRequest ( epoch , syncNodeId , context . log . endOffset ( ) . offset , epoch , 0 )
) ;
context . pollUntilResponse ( ) ;
context . assertSentFetchPartitionResponse ( Errors . NONE , epoch , OptionalInt . of ( localId ) ) ;
assertEquals ( context . log . endOffset ( ) . offset , context . client . highWatermark ( ) . getAsLong ( ) ) ;
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
@ -414,12 +399,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -414,12 +399,7 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals ( oldestSnapshotId . epoch + 2 + 1 , epoch ) ;
// Advance the highWatermark
context . deliverRequest (
context . fetchRequest ( epoch , syncNodeId , context . log . endOffset ( ) . offset , epoch , 0 )
) ;
context . pollUntilResponse ( ) ;
context . assertSentFetchPartitionResponse ( Errors . NONE , epoch , OptionalInt . of ( localId ) ) ;
assertEquals ( context . log . endOffset ( ) . offset , context . client . highWatermark ( ) . getAsLong ( ) ) ;
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
@ -463,12 +443,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -463,12 +443,7 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals ( oldestSnapshotId . epoch + 2 + 1 , epoch ) ;
// Advance the highWatermark
context . deliverRequest (
context . fetchRequest ( epoch , syncNodeId , context . log . endOffset ( ) . offset , epoch , 0 )
) ;
context . pollUntilResponse ( ) ;
context . assertSentFetchPartitionResponse ( Errors . NONE , epoch , OptionalInt . of ( localId ) ) ;
assertEquals ( context . log . endOffset ( ) . offset , context . client . highWatermark ( ) . getAsLong ( ) ) ;
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
@ -1535,6 +1510,113 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1535,6 +1510,113 @@ final public class KafkaRaftClientSnapshotTest {
context . assertSentFetchSnapshotResponse ( Errors . INCONSISTENT_CLUSTER_ID ) ;
}
@Test
public void testCreateSnapshotAsLeaderWithInvalidSnapshotId ( ) throws Exception {
int localId = 0 ;
int otherNodeId = localId + 1 ;
Set < Integer > voters = Utils . mkSet ( localId , otherNodeId ) ;
int epoch = 2 ;
List < String > appendRecords = Arrays . asList ( "a" , "b" , "c" ) ;
OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch ( 3 , epoch ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( epoch , appendRecords )
. withAppendLingerMs ( 1 )
. build ( ) ;
context . becomeLeader ( ) ;
int currentEpoch = context . currentEpoch ( ) ;
// When leader creating snapshot:
// 1.1 high watermark cannot be empty
assertEquals ( OptionalLong . empty ( ) , context . client . highWatermark ( ) ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId1 . offset , invalidSnapshotId1 . epoch , 0 ) ) ;
// 1.2 high watermark must larger than or equal to the snapshotId's endOffset
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// append some more records to make the LEO > high watermark
List < String > newRecords = Arrays . asList ( "d" , "e" , "f" ) ;
context . client . scheduleAppend ( currentEpoch , newRecords ) ;
context . time . sleep ( context . appendLingerMs ( ) ) ;
context . client . poll ( ) ;
assertEquals ( context . log . endOffset ( ) . offset , context . client . highWatermark ( ) . getAsLong ( ) + newRecords . size ( ) ) ;
OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch ( context . client . highWatermark ( ) . getAsLong ( ) + 1 , currentEpoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId2 . offset , invalidSnapshotId2 . epoch , 0 ) ) ;
// 2 the quorum epoch must larger than or equal to the snapshotId's epoch
OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch ( context . client . highWatermark ( ) . getAsLong ( ) - 2 , currentEpoch + 1 ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId3 . offset , invalidSnapshotId3 . epoch , 0 ) ) ;
// 3 the snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context . log . endOffsetForEpoch ( epoch ) ;
assertEquals ( epoch , endOffsetForEpoch . epoch ) ;
OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch ( endOffsetForEpoch . offset + 1 , epoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId4 . offset , invalidSnapshotId4 . epoch , 0 ) ) ;
}
@Test
public void testCreateSnapshotAsFollowerWithInvalidSnapshotId ( ) throws Exception {
int localId = 0 ;
int leaderId = 1 ;
int otherFollowerId = 2 ;
int epoch = 5 ;
Set < Integer > voters = Utils . mkSet ( localId , leaderId , otherFollowerId ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. withElectedLeader ( epoch , leaderId )
. build ( ) ;
context . assertElectedLeader ( epoch , leaderId ) ;
// When follower creating snapshot:
// 1) The high watermark cannot be empty
assertEquals ( OptionalLong . empty ( ) , context . client . highWatermark ( ) ) ;
OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch ( 0 , 0 ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId1 . offset , invalidSnapshotId1 . epoch , 0 ) ) ;
// Poll for our first fetch request
context . pollUntilRequest ( ) ;
RaftRequest . Outbound fetchRequest = context . assertSentFetchRequest ( ) ;
assertTrue ( voters . contains ( fetchRequest . destinationId ( ) ) ) ;
context . assertFetchRequestData ( fetchRequest , epoch , 0L , 0 ) ;
// The response does not advance the high watermark
List < String > records1 = Arrays . asList ( "a" , "b" , "c" ) ;
MemoryRecords batch1 = context . buildBatch ( 0L , 3 , records1 ) ;
context . deliverResponse ( fetchRequest . correlationId , fetchRequest . destinationId ( ) ,
context . fetchResponse ( epoch , leaderId , batch1 , 0L , Errors . NONE ) ) ;
context . client . poll ( ) ;
// 2) The high watermark must be larger than or equal to the snapshotId's endOffset
int currentEpoch = context . currentEpoch ( ) ;
OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch ( context . client . highWatermark ( ) . getAsLong ( ) + 1 , currentEpoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId2 . offset , invalidSnapshotId2 . epoch , 0 ) ) ;
// 3) The quorum epoch must be larger than or equal to the snapshotId's epoch
OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch ( context . client . highWatermark ( ) . getAsLong ( ) , currentEpoch + 1 ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId3 . offset , invalidSnapshotId3 . epoch , 0 ) ) ;
// The high watermark advances to be larger than log.endOffsetForEpoch(3), to test the case 3
context . pollUntilRequest ( ) ;
fetchRequest = context . assertSentFetchRequest ( ) ;
assertTrue ( voters . contains ( fetchRequest . destinationId ( ) ) ) ;
context . assertFetchRequestData ( fetchRequest , epoch , 3L , 3 ) ;
List < String > records2 = Arrays . asList ( "d" , "e" , "f" ) ;
MemoryRecords batch2 = context . buildBatch ( 3L , 4 , records2 ) ;
context . deliverResponse ( fetchRequest . correlationId , fetchRequest . destinationId ( ) ,
context . fetchResponse ( epoch , leaderId , batch2 , 6L , Errors . NONE ) ) ;
context . client . poll ( ) ;
assertEquals ( 6L , context . client . highWatermark ( ) . getAsLong ( ) ) ;
// 4) The snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context . log . endOffsetForEpoch ( 3 ) ;
assertEquals ( 3 , endOffsetForEpoch . epoch ) ;
OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch ( endOffsetForEpoch . offset + 1 , epoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId4 . offset , invalidSnapshotId4 . epoch , 0 ) ) ;
}
private static FetchSnapshotRequestData fetchSnapshotRequest (
TopicPartition topicPartition ,
int epoch ,