@ -60,8 +60,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -60,8 +60,8 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch snapshotId = new OffsetAndEpoch ( 3 , 1 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "d" , "e" , "f" ) )
. withEmptySnapshot ( snapshotId )
. deleteBeforeSnapshot ( snapshotId )
. build ( ) ;
@ -92,8 +92,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -92,8 +92,8 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch snapshotId = new OffsetAndEpoch ( 3 , 1 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "d" , "e" , "f" ) )
. withEmptySnapshot ( snapshotId )
. deleteBeforeSnapshot ( snapshotId )
. withElectedLeader ( epoch , leaderId )
@ -103,7 +103,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -103,7 +103,7 @@ final public class KafkaRaftClientSnapshotTest {
long localLogEndOffset = context . log . endOffset ( ) . offset ;
context . pollUntilRequest ( ) ;
RaftRequest . Outbound fetchRequest = context . assertSentFetchRequest ( ) ;
context . assertFetchRequestData ( fetchRequest , epoch , localLogEndOffset , snapshotId . epoch ) ;
context . assertFetchRequestData ( fetchRequest , epoch , localLogEndOffset , snapshotId . epoch ( ) ) ;
context . deliverResponse (
fetchRequest . correlationId ,
fetchRequest . destinationId ( ) ,
@ -111,7 +111,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -111,7 +111,7 @@ final public class KafkaRaftClientSnapshotTest {
) ;
context . pollUntilRequest ( ) ;
context . assertSentFetchRequest ( epoch , localLogEndOffset , snapshotId . epoch ) ;
context . assertSentFetchRequest ( epoch , localLogEndOffset , snapshotId . epoch ( ) ) ;
// Check that listener was notified of the new snapshot
try ( SnapshotReader < String > snapshot = context . listener . drainHandledSnapshot ( ) . get ( ) ) {
@ -129,8 +129,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -129,8 +129,8 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch snapshotId = new OffsetAndEpoch ( 3 , 1 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "d" , "e" , "f" ) )
. withEmptySnapshot ( snapshotId )
. deleteBeforeSnapshot ( snapshotId )
. withElectedLeader ( epoch , leaderId )
@ -140,7 +140,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -140,7 +140,7 @@ final public class KafkaRaftClientSnapshotTest {
long localLogEndOffset = context . log . endOffset ( ) . offset ;
context . pollUntilRequest ( ) ;
RaftRequest . Outbound fetchRequest = context . assertSentFetchRequest ( ) ;
context . assertFetchRequestData ( fetchRequest , epoch , localLogEndOffset , snapshotId . epoch ) ;
context . assertFetchRequestData ( fetchRequest , epoch , localLogEndOffset , snapshotId . epoch ( ) ) ;
context . deliverResponse (
fetchRequest . correlationId ,
fetchRequest . destinationId ( ) ,
@ -148,7 +148,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -148,7 +148,7 @@ final public class KafkaRaftClientSnapshotTest {
) ;
context . pollUntilRequest ( ) ;
context . assertSentFetchRequest ( epoch , localLogEndOffset , snapshotId . epoch ) ;
context . assertSentFetchRequest ( epoch , localLogEndOffset , snapshotId . epoch ( ) ) ;
RaftClientTestContext . MockListener secondListener = new RaftClientTestContext . MockListener ( OptionalInt . of ( localId ) ) ;
context . client . register ( secondListener ) ;
@ -169,9 +169,9 @@ final public class KafkaRaftClientSnapshotTest {
@@ -169,9 +169,9 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch snapshotId = new OffsetAndEpoch ( 3 , 1 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "g" , "h" , "i" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "g" , "h" , "i" ) )
. withEmptySnapshot ( snapshotId )
. deleteBeforeSnapshot ( snapshotId )
. build ( ) ;
@ -197,7 +197,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -197,7 +197,7 @@ final public class KafkaRaftClientSnapshotTest {
// Generate a new snapshot
OffsetAndEpoch secondSnapshotId = new OffsetAndEpoch ( localLogEndOffset , epoch ) ;
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( secondSnapshotId . offset - 1 , secondSnapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( secondSnapshotId . offset ( ) - 1 , secondSnapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( secondSnapshotId , snapshot . snapshotId ( ) ) ;
snapshot . freeze ( ) ;
}
@ -243,7 +243,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -243,7 +243,7 @@ final public class KafkaRaftClientSnapshotTest {
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
OffsetAndEpoch snapshotId = new OffsetAndEpoch ( localLogEndOffset , epoch ) ;
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset - 1 , snapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset ( ) - 1 , snapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( snapshotId , snapshot . snapshotId ( ) ) ;
snapshot . freeze ( ) ;
}
@ -257,8 +257,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -257,8 +257,8 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals ( Errors . NONE , Errors . forCode ( partitionResponse . errorCode ( ) ) ) ;
assertEquals ( epoch , partitionResponse . currentLeader ( ) . leaderEpoch ( ) ) ;
assertEquals ( localId , partitionResponse . currentLeader ( ) . leaderId ( ) ) ;
assertEquals ( snapshotId . epoch , partitionResponse . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset , partitionResponse . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , partitionResponse . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , partitionResponse . snapshotId ( ) . endOffset ( ) ) ;
}
@Test
@ -270,20 +270,20 @@ final public class KafkaRaftClientSnapshotTest {
@@ -270,20 +270,20 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch ( 3 , 2 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "d" , "e" , "f" ) )
. withAppendLingerMs ( 1 )
. build ( ) ;
context . becomeLeader ( ) ;
int epoch = context . currentEpoch ( ) ;
assertEquals ( oldestSnapshotId . epoch + 1 , epoch ) ;
assertEquals ( oldestSnapshotId . epoch ( ) + 1 , epoch ) ;
// Advance the highWatermark
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset ( ) - 1 , oldestSnapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( oldestSnapshotId , snapshot . snapshotId ( ) ) ;
snapshot . freeze ( ) ;
}
@ -294,7 +294,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -294,7 +294,7 @@ final public class KafkaRaftClientSnapshotTest {
context . client . poll ( ) ;
// It is an invalid request to send an last fetched epoch greater than the current epoch
context . deliverRequest ( context . fetchRequest ( epoch , otherNodeId , oldestSnapshotId . offset + 1 , epoch + 1 , 0 ) ) ;
context . deliverRequest ( context . fetchRequest ( epoch , otherNodeId , oldestSnapshotId . offset ( ) + 1 , epoch + 1 , 0 ) ) ;
context . pollUntilResponse ( ) ;
context . assertSentFetchPartitionResponse ( Errors . INVALID_REQUEST , epoch , OptionalInt . of ( localId ) ) ;
}
@ -309,20 +309,20 @@ final public class KafkaRaftClientSnapshotTest {
@@ -309,20 +309,20 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch ( 3 , 2 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch + 2 , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) + 2 , Arrays . asList ( "d" , "e" , "f" ) )
. withAppendLingerMs ( 1 )
. build ( ) ;
context . becomeLeader ( ) ;
int epoch = context . currentEpoch ( ) ;
assertEquals ( oldestSnapshotId . epoch + 2 + 1 , epoch ) ;
assertEquals ( oldestSnapshotId . epoch ( ) + 2 + 1 , epoch ) ;
// Advance the highWatermark
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset ( ) - 1 , oldestSnapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( oldestSnapshotId , snapshot . snapshotId ( ) ) ;
snapshot . freeze ( ) ;
}
@ -330,15 +330,15 @@ final public class KafkaRaftClientSnapshotTest {
@@ -330,15 +330,15 @@ final public class KafkaRaftClientSnapshotTest {
// This should truncate to the old snapshot
context . deliverRequest (
context . fetchRequest ( epoch , otherNodeId , oldestSnapshotId . offset + 1 , oldestSnapshotId . epoch + 1 , 0 )
context . fetchRequest ( epoch , otherNodeId , oldestSnapshotId . offset ( ) + 1 , oldestSnapshotId . epoch ( ) + 1 , 0 )
) ;
context . pollUntilResponse ( ) ;
FetchResponseData . PartitionData partitionResponse = context . assertSentFetchPartitionResponse ( ) ;
assertEquals ( Errors . NONE , Errors . forCode ( partitionResponse . errorCode ( ) ) ) ;
assertEquals ( epoch , partitionResponse . currentLeader ( ) . leaderEpoch ( ) ) ;
assertEquals ( localId , partitionResponse . currentLeader ( ) . leaderId ( ) ) ;
assertEquals ( oldestSnapshotId . epoch , partitionResponse . divergingEpoch ( ) . epoch ( ) ) ;
assertEquals ( oldestSnapshotId . offset , partitionResponse . divergingEpoch ( ) . endOffset ( ) ) ;
assertEquals ( oldestSnapshotId . epoch ( ) , partitionResponse . divergingEpoch ( ) . epoch ( ) ) ;
assertEquals ( oldestSnapshotId . offset ( ) , partitionResponse . divergingEpoch ( ) . endOffset ( ) ) ;
}
@Test
@ -351,21 +351,21 @@ final public class KafkaRaftClientSnapshotTest {
@@ -351,21 +351,21 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch ( 3 , 2 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( oldestSnapshotId . epoch + 2 , Arrays . asList ( "g" , "h" , "i" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) + 2 , Arrays . asList ( "g" , "h" , "i" ) )
. withAppendLingerMs ( 1 )
. build ( ) ;
context . becomeLeader ( ) ;
int epoch = context . currentEpoch ( ) ;
assertEquals ( oldestSnapshotId . epoch + 2 + 1 , epoch ) ;
assertEquals ( oldestSnapshotId . epoch ( ) + 2 + 1 , epoch ) ;
// Advance the highWatermark
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset ( ) - 1 , oldestSnapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( oldestSnapshotId , snapshot . snapshotId ( ) ) ;
snapshot . freeze ( ) ;
}
@ -373,7 +373,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -373,7 +373,7 @@ final public class KafkaRaftClientSnapshotTest {
// Send fetch request at log start offset with valid last fetched epoch
context . deliverRequest (
context . fetchRequest ( epoch , otherNodeId , oldestSnapshotId . offset , oldestSnapshotId . epoch , 0 )
context . fetchRequest ( epoch , otherNodeId , oldestSnapshotId . offset ( ) , oldestSnapshotId . epoch ( ) , 0 )
) ;
context . pollUntilResponse ( ) ;
context . assertSentFetchPartitionResponse ( Errors . NONE , epoch , OptionalInt . of ( localId ) ) ;
@ -389,21 +389,21 @@ final public class KafkaRaftClientSnapshotTest {
@@ -389,21 +389,21 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch ( 3 , 2 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( oldestSnapshotId . epoch + 2 , Arrays . asList ( "g" , "h" , "i" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) + 2 , Arrays . asList ( "g" , "h" , "i" ) )
. withAppendLingerMs ( 1 )
. build ( ) ;
context . becomeLeader ( ) ;
int epoch = context . currentEpoch ( ) ;
assertEquals ( oldestSnapshotId . epoch + 2 + 1 , epoch ) ;
assertEquals ( oldestSnapshotId . epoch ( ) + 2 + 1 , epoch ) ;
// Advance the highWatermark
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset ( ) - 1 , oldestSnapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( oldestSnapshotId , snapshot . snapshotId ( ) ) ;
snapshot . freeze ( ) ;
}
@ -412,15 +412,15 @@ final public class KafkaRaftClientSnapshotTest {
@@ -412,15 +412,15 @@ final public class KafkaRaftClientSnapshotTest {
// Send fetch with log start offset and invalid last fetched epoch
context . deliverRequest (
context . fetchRequest ( epoch , otherNodeId , oldestSnapshotId . offset , oldestSnapshotId . epoch + 1 , 0 )
context . fetchRequest ( epoch , otherNodeId , oldestSnapshotId . offset ( ) , oldestSnapshotId . epoch ( ) + 1 , 0 )
) ;
context . pollUntilResponse ( ) ;
FetchResponseData . PartitionData partitionResponse = context . assertSentFetchPartitionResponse ( ) ;
assertEquals ( Errors . NONE , Errors . forCode ( partitionResponse . errorCode ( ) ) ) ;
assertEquals ( epoch , partitionResponse . currentLeader ( ) . leaderEpoch ( ) ) ;
assertEquals ( localId , partitionResponse . currentLeader ( ) . leaderId ( ) ) ;
assertEquals ( oldestSnapshotId . epoch , partitionResponse . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( oldestSnapshotId . offset , partitionResponse . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( oldestSnapshotId . epoch ( ) , partitionResponse . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( oldestSnapshotId . offset ( ) , partitionResponse . snapshotId ( ) . endOffset ( ) ) ;
}
@Test
@ -433,21 +433,21 @@ final public class KafkaRaftClientSnapshotTest {
@@ -433,21 +433,21 @@ final public class KafkaRaftClientSnapshotTest {
OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch ( 3 , 2 ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( oldestSnapshotId . epoch + 2 , Arrays . asList ( "g" , "h" , "i" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "a" , "b" , "c" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) , Arrays . asList ( "d" , "e" , "f" ) )
. appendToLog ( oldestSnapshotId . epoch ( ) + 2 , Arrays . asList ( "g" , "h" , "i" ) )
. withAppendLingerMs ( 1 )
. build ( ) ;
context . becomeLeader ( ) ;
int epoch = context . currentEpoch ( ) ;
assertEquals ( oldestSnapshotId . epoch + 2 + 1 , epoch ) ;
assertEquals ( oldestSnapshotId . epoch ( ) + 2 + 1 , epoch ) ;
// Advance the highWatermark
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
// Create a snapshot at the high watermark
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset - 1 , oldestSnapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( oldestSnapshotId . offset ( ) - 1 , oldestSnapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( oldestSnapshotId , snapshot . snapshotId ( ) ) ;
snapshot . freeze ( ) ;
}
@ -459,7 +459,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -459,7 +459,7 @@ final public class KafkaRaftClientSnapshotTest {
epoch ,
otherNodeId ,
context . log . endOffset ( ) . offset ,
oldestSnapshotId . epoch - 1 ,
oldestSnapshotId . epoch ( ) - 1 ,
0
)
) ;
@ -468,8 +468,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -468,8 +468,8 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals ( Errors . NONE , Errors . forCode ( partitionResponse . errorCode ( ) ) ) ;
assertEquals ( epoch , partitionResponse . currentLeader ( ) . leaderEpoch ( ) ) ;
assertEquals ( localId , partitionResponse . currentLeader ( ) . leaderId ( ) ) ;
assertEquals ( oldestSnapshotId . epoch , partitionResponse . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( oldestSnapshotId . offset , partitionResponse . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( oldestSnapshotId . epoch ( ) , partitionResponse . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( oldestSnapshotId . offset ( ) , partitionResponse . snapshotId ( ) . endOffset ( ) ) ;
}
@Test
@ -529,7 +529,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -529,7 +529,7 @@ final public class KafkaRaftClientSnapshotTest {
List < String > records = Arrays . asList ( "foo" , "bar" ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "a" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "a" ) )
. build ( ) ;
context . becomeLeader ( ) ;
@ -537,7 +537,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -537,7 +537,7 @@ final public class KafkaRaftClientSnapshotTest {
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset - 1 , snapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset ( ) - 1 , snapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( snapshotId , snapshot . snapshotId ( ) ) ;
snapshot . append ( records ) ;
snapshot . freeze ( ) ;
@ -578,7 +578,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -578,7 +578,7 @@ final public class KafkaRaftClientSnapshotTest {
List < String > records = Arrays . asList ( "foo" , "bar" ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( snapshotId . epoch , records )
. appendToLog ( snapshotId . epoch ( ) , records )
. build ( ) ;
context . becomeLeader ( ) ;
@ -586,7 +586,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -586,7 +586,7 @@ final public class KafkaRaftClientSnapshotTest {
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset - 1 , snapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset ( ) - 1 , snapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( snapshotId , snapshot . snapshotId ( ) ) ;
snapshot . append ( records ) ;
snapshot . freeze ( ) ;
@ -687,7 +687,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -687,7 +687,7 @@ final public class KafkaRaftClientSnapshotTest {
List < String > records = Arrays . asList ( "foo" , "bar" ) ;
RaftClientTestContext context = new RaftClientTestContext . Builder ( localId , voters )
. appendToLog ( snapshotId . epoch , Arrays . asList ( "a" ) )
. appendToLog ( snapshotId . epoch ( ) , Arrays . asList ( "a" ) )
. build ( ) ;
context . becomeLeader ( ) ;
@ -695,7 +695,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -695,7 +695,7 @@ final public class KafkaRaftClientSnapshotTest {
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset - 1 , snapshotId . epoch , 0 ) . get ( ) ) {
try ( SnapshotWriter < String > snapshot = context . client . createSnapshot ( snapshotId . offset ( ) - 1 , snapshotId . epoch ( ) , 0 ) . get ( ) ) {
assertEquals ( snapshotId , snapshot . snapshotId ( ) ) ;
snapshot . append ( records ) ;
snapshot . freeze ( ) ;
@ -882,8 +882,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -882,8 +882,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
List < String > records = Arrays . asList ( "foo" , "bar" ) ;
@ -909,7 +909,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -909,7 +909,7 @@ final public class KafkaRaftClientSnapshotTest {
context . pollUntilRequest ( ) ;
fetchRequest = context . assertSentFetchRequest ( ) ;
context . assertFetchRequestData ( fetchRequest , epoch , snapshotId . offset , snapshotId . epoch ) ;
context . assertFetchRequestData ( fetchRequest , epoch , snapshotId . offset ( ) , snapshotId . epoch ( ) ) ;
// Check that the snapshot was written to the log
RawSnapshotReader snapshot = context . log . readSnapshot ( snapshotId ) . get ( ) ;
@ -953,8 +953,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -953,8 +953,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
List < String > records = Arrays . asList ( "foo" , "bar" ) ;
@ -989,8 +989,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -989,8 +989,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( sendingBuffer . limit ( ) , request . position ( ) ) ;
sendingBuffer = memorySnapshot . buffer ( ) . slice ( ) ;
@ -1012,7 +1012,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1012,7 +1012,7 @@ final public class KafkaRaftClientSnapshotTest {
context . pollUntilRequest ( ) ;
fetchRequest = context . assertSentFetchRequest ( ) ;
context . assertFetchRequestData ( fetchRequest , epoch , snapshotId . offset , snapshotId . epoch ) ;
context . assertFetchRequestData ( fetchRequest , epoch , snapshotId . offset ( ) , snapshotId . epoch ( ) ) ;
// Check that the snapshot was written to the log
RawSnapshotReader snapshot = context . log . readSnapshot ( snapshotId ) . get ( ) ;
@ -1056,8 +1056,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1056,8 +1056,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
// Reply with a snapshot not found error
@ -1114,8 +1114,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1114,8 +1114,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
// Reply with new leader response
@ -1171,8 +1171,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1171,8 +1171,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
// Reply with new leader epoch
@ -1228,8 +1228,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1228,8 +1228,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
// Reply with unknown leader epoch
@ -1260,8 +1260,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1260,8 +1260,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
}
@ -1295,8 +1295,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1295,8 +1295,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
// Reply with an invalid snapshot id endOffset
@ -1314,7 +1314,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1314,7 +1314,7 @@ final public class KafkaRaftClientSnapshotTest {
responsePartitionSnapshot
. snapshotId ( )
. setEndOffset ( - 1 )
. setEpoch ( snapshotId . epoch ) ;
. setEpoch ( snapshotId . epoch ( ) ) ;
return responsePartitionSnapshot ;
}
@ -1342,8 +1342,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1342,8 +1342,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
// Reply with an invalid snapshot id epoch
@ -1360,7 +1360,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1360,7 +1360,7 @@ final public class KafkaRaftClientSnapshotTest {
responsePartitionSnapshot
. snapshotId ( )
. setEndOffset ( snapshotId . offset )
. setEndOffset ( snapshotId . offset ( ) )
. setEpoch ( - 1 ) ;
return responsePartitionSnapshot ;
@ -1406,8 +1406,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1406,8 +1406,8 @@ final public class KafkaRaftClientSnapshotTest {
localId ,
Integer . MAX_VALUE
) . get ( ) ;
assertEquals ( snapshotId . offset , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( snapshotId . offset ( ) , request . snapshotId ( ) . endOffset ( ) ) ;
assertEquals ( snapshotId . epoch ( ) , request . snapshotId ( ) . epoch ( ) ) ;
assertEquals ( 0 , request . position ( ) ) ;
// Sleeping for fetch timeout should transition to candidate
@ -1432,8 +1432,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1432,8 +1432,8 @@ final public class KafkaRaftClientSnapshotTest {
responsePartitionSnapshot
. snapshotId ( )
. setEndOffset ( snapshotId . offset )
. setEpoch ( snapshotId . epoch ) ;
. setEndOffset ( snapshotId . offset ( ) )
. setEpoch ( snapshotId . epoch ( ) ) ;
return responsePartitionSnapshot ;
}
@ -1532,7 +1532,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1532,7 +1532,7 @@ final public class KafkaRaftClientSnapshotTest {
// When leader creating snapshot:
// 1.1 high watermark cannot be empty
assertEquals ( OptionalLong . empty ( ) , context . client . highWatermark ( ) ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId1 . offset , invalidSnapshotId1 . epoch , 0 ) ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId1 . offset ( ) , invalidSnapshotId1 . epoch ( ) , 0 ) ) ;
// 1.2 high watermark must larger than or equal to the snapshotId's endOffset
context . advanceLocalLeaderHighWatermarkToLogEndOffset ( ) ;
@ -1544,17 +1544,17 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1544,17 +1544,17 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals ( context . log . endOffset ( ) . offset , context . client . highWatermark ( ) . getAsLong ( ) + newRecords . size ( ) ) ;
OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch ( context . client . highWatermark ( ) . getAsLong ( ) + 1 , currentEpoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId2 . offset , invalidSnapshotId2 . epoch , 0 ) ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId2 . offset ( ) , invalidSnapshotId2 . epoch ( ) , 0 ) ) ;
// 2 the quorum epoch must larger than or equal to the snapshotId's epoch
OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch ( context . client . highWatermark ( ) . getAsLong ( ) - 2 , currentEpoch + 1 ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId3 . offset , invalidSnapshotId3 . epoch , 0 ) ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId3 . offset ( ) , invalidSnapshotId3 . epoch ( ) , 0 ) ) ;
// 3 the snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context . log . endOffsetForEpoch ( epoch ) ;
assertEquals ( epoch , endOffsetForEpoch . epoch ) ;
OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch ( endOffsetForEpoch . offset + 1 , epoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId4 . offset , invalidSnapshotId4 . epoch , 0 ) ) ;
assertEquals ( epoch , endOffsetForEpoch . epoch ( ) ) ;
OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch ( endOffsetForEpoch . offset ( ) + 1 , epoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId4 . offset ( ) , invalidSnapshotId4 . epoch ( ) , 0 ) ) ;
}
@Test
@ -1574,7 +1574,7 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1574,7 +1574,7 @@ final public class KafkaRaftClientSnapshotTest {
// 1) The high watermark cannot be empty
assertEquals ( OptionalLong . empty ( ) , context . client . highWatermark ( ) ) ;
OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch ( 0 , 0 ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId1 . offset , invalidSnapshotId1 . epoch , 0 ) ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId1 . offset ( ) , invalidSnapshotId1 . epoch ( ) , 0 ) ) ;
// Poll for our first fetch request
context . pollUntilRequest ( ) ;
@ -1592,11 +1592,11 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1592,11 +1592,11 @@ final public class KafkaRaftClientSnapshotTest {
// 2) The high watermark must be larger than or equal to the snapshotId's endOffset
int currentEpoch = context . currentEpoch ( ) ;
OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch ( context . client . highWatermark ( ) . getAsLong ( ) + 1 , currentEpoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId2 . offset , invalidSnapshotId2 . epoch , 0 ) ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId2 . offset ( ) , invalidSnapshotId2 . epoch ( ) , 0 ) ) ;
// 3) The quorum epoch must be larger than or equal to the snapshotId's epoch
OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch ( context . client . highWatermark ( ) . getAsLong ( ) , currentEpoch + 1 ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId3 . offset , invalidSnapshotId3 . epoch , 0 ) ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId3 . offset ( ) , invalidSnapshotId3 . epoch ( ) , 0 ) ) ;
// The high watermark advances to be larger than log.endOffsetForEpoch(3), to test the case 3
context . pollUntilRequest ( ) ;
@ -1613,9 +1613,9 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1613,9 +1613,9 @@ final public class KafkaRaftClientSnapshotTest {
// 4) The snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context . log . endOffsetForEpoch ( 3 ) ;
assertEquals ( 3 , endOffsetForEpoch . epoch ) ;
OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch ( endOffsetForEpoch . offset + 1 , epoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId4 . offset , invalidSnapshotId4 . epoch , 0 ) ) ;
assertEquals ( 3 , endOffsetForEpoch . epoch ( ) ) ;
OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch ( endOffsetForEpoch . offset ( ) + 1 , epoch ) ;
assertThrows ( IllegalArgumentException . class , ( ) - > context . client . createSnapshot ( invalidSnapshotId4 . offset ( ) , invalidSnapshotId4 . epoch ( ) , 0 ) ) ;
}
private static FetchSnapshotRequestData fetchSnapshotRequest (
@ -1637,8 +1637,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1637,8 +1637,8 @@ final public class KafkaRaftClientSnapshotTest {
long position
) {
FetchSnapshotRequestData . SnapshotId snapshotId = new FetchSnapshotRequestData . SnapshotId ( )
. setEndOffset ( offsetAndEpoch . offset )
. setEpoch ( offsetAndEpoch . epoch ) ;
. setEndOffset ( offsetAndEpoch . offset ( ) )
. setEpoch ( offsetAndEpoch . epoch ( ) ) ;
FetchSnapshotRequestData request = FetchSnapshotRequest . singleton (
clusterId ,
@ -1671,8 +1671,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1671,8 +1671,8 @@ final public class KafkaRaftClientSnapshotTest {
. setLeaderId ( leaderId ) ;
partitionSnapshot . snapshotId ( )
. setEndOffset ( snapshotId . offset )
. setEpoch ( snapshotId . epoch ) ;
. setEndOffset ( snapshotId . offset ( ) )
. setEpoch ( snapshotId . epoch ( ) ) ;
return partitionSnapshot
. setSize ( size )
@ -1698,8 +1698,8 @@ final public class KafkaRaftClientSnapshotTest {
@@ -1698,8 +1698,8 @@ final public class KafkaRaftClientSnapshotTest {
. setLeaderId ( leaderId ) ;
partitionData . snapshotId ( )
. setEpoch ( snapshotId . epoch )
. setEndOffset ( snapshotId . offset ) ;
. setEpoch ( snapshotId . epoch ( ) )
. setEndOffset ( snapshotId . offset ( ) ) ;
} ) ;
}