@ -20,9 +20,7 @@ import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
@@ -20,9 +20,7 @@ import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time ;
import org.apache.kafka.common.utils.Utils ;
import org.apache.kafka.test.TestUtils ;
import org.junit.Rule ;
import org.junit.Test ;
import org.junit.rules.ExpectedException ;
import org.junit.runner.RunWith ;
import org.junit.runners.Parameterized ;
@ -30,6 +28,7 @@ import java.nio.ByteBuffer;
@@ -30,6 +28,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Collection ;
import java.util.function.Supplier ;
import java.util.List ;
import java.util.Random ;
@ -38,14 +37,13 @@ import static org.apache.kafka.common.utils.Utils.utf8;
@@ -38,14 +37,13 @@ import static org.apache.kafka.common.utils.Utils.utf8;
import static org.junit.Assert.assertEquals ;
import static org.junit.Assert.assertFalse ;
import static org.junit.Assert.assertNotNull ;
import static org.junit.Assert.assertThrows ;
import static org.junit.Assert.assertTrue ;
import static org.junit.Assert.fail ;
import static org.junit.Assume.assumeTrue ;
@RunWith ( value = Parameterized . class )
public class MemoryRecordsBuilderTest {
@Rule
public ExpectedException exceptionRule = ExpectedException . none ( ) ;
private final CompressionType compressionType ;
private final int bufferOffset ;
private final Time time ;
@ -58,17 +56,25 @@ public class MemoryRecordsBuilderTest {
@@ -58,17 +56,25 @@ public class MemoryRecordsBuilderTest {
@Test
public void testWriteEmptyRecordSet ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V0 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V0 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 128 ) ;
buffer . position ( bufferOffset ) ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V0 , compressionType ,
TimestampType . CREATE_TIME , 0L , 0L , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
MemoryRecords records = builder . build ( ) ;
assertEquals ( 0 , records . sizeInBytes ( ) ) ;
assertEquals ( bufferOffset , buffer . position ( ) ) ;
Supplier < MemoryRecordsBuilder > builderSupplier = ( ) - > new MemoryRecordsBuilder ( buffer , magic ,
compressionType , TimestampType . CREATE_TIME , 0L , 0L ,
RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
if ( compressionType ! = CompressionType . ZSTD ) {
MemoryRecords records = builderSupplier . get ( ) . build ( ) ;
assertEquals ( 0 , records . sizeInBytes ( ) ) ;
assertEquals ( bufferOffset , buffer . position ( ) ) ;
} else {
Exception e = assertThrows ( IllegalArgumentException . class , ( ) - > builderSupplier . get ( ) . build ( ) ) ;
assertEquals ( e . getMessage ( ) , "ZStandard compression is not supported for magic " + magic ) ;
}
}
@Test
@ -215,18 +221,19 @@ public class MemoryRecordsBuilderTest {
@@ -215,18 +221,19 @@ public class MemoryRecordsBuilderTest {
@Test
public void testCompressionRateV0 ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V0 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V0 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 1024 ) ;
buffer . position ( bufferOffset ) ;
LegacyRecord [ ] records = new LegacyRecord [ ] {
LegacyRecord . create ( RecordBatch . MAGIC_VALUE_V0 , 0L , "a" . getBytes ( ) , "1" . getBytes ( ) ) ,
LegacyRecord . create ( RecordBatch . MAGIC_VALUE_V0 , 1L , "b" . getBytes ( ) , "2" . getBytes ( ) ) ,
LegacyRecord . create ( RecordBatch . MAGIC_VALUE_V0 , 2L , "c" . getBytes ( ) , "3" . getBytes ( ) ) ,
LegacyRecord . create ( magic , 0L , "a" . getBytes ( ) , "1" . getBytes ( ) ) ,
LegacyRecord . create ( magic , 1L , "b" . getBytes ( ) , "2" . getBytes ( ) ) ,
LegacyRecord . create ( magic , 2L , "c" . getBytes ( ) , "3" . getBytes ( ) ) ,
} ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V0 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . CREATE_TIME , 0L , 0L , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
@ -272,18 +279,19 @@ public class MemoryRecordsBuilderTest {
@@ -272,18 +279,19 @@ public class MemoryRecordsBuilderTest {
@Test
public void testCompressionRateV1 ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V1 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V1 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 1024 ) ;
buffer . position ( bufferOffset ) ;
LegacyRecord [ ] records = new LegacyRecord [ ] {
LegacyRecord . create ( RecordBatch . MAGIC_VALUE_V1 , 0L , "a" . getBytes ( ) , "1" . getBytes ( ) ) ,
LegacyRecord . create ( RecordBatch . MAGIC_VALUE_V1 , 1L , "b" . getBytes ( ) , "2" . getBytes ( ) ) ,
LegacyRecord . create ( RecordBatch . MAGIC_VALUE_V1 , 2L , "c" . getBytes ( ) , "3" . getBytes ( ) ) ,
LegacyRecord . create ( magic , 0L , "a" . getBytes ( ) , "1" . getBytes ( ) ) ,
LegacyRecord . create ( magic , 1L , "b" . getBytes ( ) , "2" . getBytes ( ) ) ,
LegacyRecord . create ( magic , 2L , "c" . getBytes ( ) , "3" . getBytes ( ) ) ,
} ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V1 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . CREATE_TIME , 0L , 0L , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
@ -305,13 +313,14 @@ public class MemoryRecordsBuilderTest {
@@ -305,13 +313,14 @@ public class MemoryRecordsBuilderTest {
@Test
public void buildUsingLogAppendTime ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V1 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V1 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 1024 ) ;
buffer . position ( bufferOffset ) ;
long logAppendTime = System . currentTimeMillis ( ) ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V1 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . LOG_APPEND_TIME , 0L , logAppendTime , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH ,
RecordBatch . NO_SEQUENCE , false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
builder . append ( 0L , "a" . getBytes ( ) , "1" . getBytes ( ) ) ;
@ -336,13 +345,14 @@ public class MemoryRecordsBuilderTest {
@@ -336,13 +345,14 @@ public class MemoryRecordsBuilderTest {
@Test
public void buildUsingCreateTime ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V1 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V1 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 1024 ) ;
buffer . position ( bufferOffset ) ;
long logAppendTime = System . currentTimeMillis ( ) ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V1 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . CREATE_TIME , 0L , logAppendTime , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
builder . append ( 0L , "a" . getBytes ( ) , "1" . getBytes ( ) ) ;
@ -369,7 +379,8 @@ public class MemoryRecordsBuilderTest {
@@ -369,7 +379,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void testAppendedChecksumConsistency ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V0 ) ;
assumeAtLeastV2OrNotZstd ( RecordBatch . MAGIC_VALUE_V0 ) ;
assumeAtLeastV2OrNotZstd ( RecordBatch . MAGIC_VALUE_V1 ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 512 ) ;
for ( byte magic : Arrays . asList ( RecordBatch . MAGIC_VALUE_V0 , RecordBatch . MAGIC_VALUE_V1 , RecordBatch . MAGIC_VALUE_V2 ) ) {
@ -415,13 +426,14 @@ public class MemoryRecordsBuilderTest {
@@ -415,13 +426,14 @@ public class MemoryRecordsBuilderTest {
@Test
public void writePastLimit ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V1 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V1 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 64 ) ;
buffer . position ( bufferOffset ) ;
long logAppendTime = System . currentTimeMillis ( ) ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V1 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . CREATE_TIME , 0L , logAppendTime , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
builder . setEstimatedCompressionRatio ( 0 . 5f ) ;
@ -462,11 +474,6 @@ public class MemoryRecordsBuilderTest {
@@ -462,11 +474,6 @@ public class MemoryRecordsBuilderTest {
@Test
public void convertV2ToV1UsingMixedCreateAndLogAppendTime ( ) {
if ( compressionType = = CompressionType . ZSTD ) {
exceptionRule . expect ( UnsupportedCompressionTypeException . class ) ;
exceptionRule . expectMessage ( "Down-conversion of zstandard-compressed batches is not supported" ) ;
}
ByteBuffer buffer = ByteBuffer . allocate ( 512 ) ;
MemoryRecordsBuilder builder = MemoryRecords . builder ( buffer , RecordBatch . MAGIC_VALUE_V2 ,
compressionType , TimestampType . LOG_APPEND_TIME , 0L ) ;
@ -493,36 +500,44 @@ public class MemoryRecordsBuilderTest {
@@ -493,36 +500,44 @@ public class MemoryRecordsBuilderTest {
buffer . flip ( ) ;
ConvertedRecords < MemoryRecords > convertedRecords = MemoryRecords . readableRecords ( buffer )
. downConvert ( RecordBatch . MAGIC_VALUE_V1 , 0 , time ) ;
MemoryRecords records = convertedRecords . records ( ) ;
Supplier < ConvertedRecords < MemoryRecords > > convertedRecordsSupplier = ( ) - >
MemoryRecords . readableRecords ( buffer ) . downConvert ( RecordBatch . MAGIC_VALUE_V1 , 0 , time ) ;
// Transactional markers are skipped when down converting to V1, so exclude them from size
verifyRecordsProcessingStats ( convertedRecords . recordConversionStats ( ) ,
if ( compressionType ! = CompressionType . ZSTD ) {
ConvertedRecords < MemoryRecords > convertedRecords = convertedRecordsSupplier . get ( ) ;
MemoryRecords records = convertedRecords . records ( ) ;
// Transactional markers are skipped when down converting to V1, so exclude them from size
verifyRecordsProcessingStats ( convertedRecords . recordConversionStats ( ) ,
3 , 3 , records . sizeInBytes ( ) , sizeExcludingTxnMarkers ) ;
List < ? extends RecordBatch > batches = Utils . toList ( records . batches ( ) . iterator ( ) ) ;
if ( compressionType ! = CompressionType . NONE ) {
assertEquals ( 2 , batches . size ( ) ) ;
assertEquals ( TimestampType . LOG_APPEND_TIME , batches . get ( 0 ) . timestampType ( ) ) ;
assertEquals ( TimestampType . CREATE_TIME , batches . get ( 1 ) . timestampType ( ) ) ;
List < ? extends RecordBatch > batches = Utils . toList ( records . batches ( ) . iterator ( ) ) ;
if ( compressionType ! = CompressionType . NONE ) {
assertEquals ( 2 , batches . size ( ) ) ;
assertEquals ( TimestampType . LOG_APPEND_TIME , batches . get ( 0 ) . timestampType ( ) ) ;
assertEquals ( TimestampType . CREATE_TIME , batches . get ( 1 ) . timestampType ( ) ) ;
} else {
assertEquals ( 3 , batches . size ( ) ) ;
assertEquals ( TimestampType . LOG_APPEND_TIME , batches . get ( 0 ) . timestampType ( ) ) ;
assertEquals ( TimestampType . CREATE_TIME , batches . get ( 1 ) . timestampType ( ) ) ;
assertEquals ( TimestampType . CREATE_TIME , batches . get ( 2 ) . timestampType ( ) ) ;
}
List < Record > logRecords = Utils . toList ( records . records ( ) . iterator ( ) ) ;
assertEquals ( 3 , logRecords . size ( ) ) ;
assertEquals ( ByteBuffer . wrap ( "1" . getBytes ( ) ) , logRecords . get ( 0 ) . key ( ) ) ;
assertEquals ( ByteBuffer . wrap ( "2" . getBytes ( ) ) , logRecords . get ( 1 ) . key ( ) ) ;
assertEquals ( ByteBuffer . wrap ( "3" . getBytes ( ) ) , logRecords . get ( 2 ) . key ( ) ) ;
} else {
assertEquals ( 3 , batches . size ( ) ) ;
assertEquals ( TimestampType . LOG_APPEND_TIME , batches . get ( 0 ) . timestampType ( ) ) ;
assertEquals ( TimestampType . CREATE_TIME , batches . get ( 1 ) . timestampType ( ) ) ;
assertEquals ( TimestampType . CREATE_TIME , batches . get ( 2 ) . timestampType ( ) ) ;
Exception e = assertThrows ( UnsupportedCompressionTypeException . class , convertedRecordsSupplier : : get ) ;
assertEquals ( "Down-conversion of zstandard-compressed batches is not supported" , e . getMessage ( ) ) ;
}
List < Record > logRecords = Utils . toList ( records . records ( ) . iterator ( ) ) ;
assertEquals ( 3 , logRecords . size ( ) ) ;
assertEquals ( ByteBuffer . wrap ( "1" . getBytes ( ) ) , logRecords . get ( 0 ) . key ( ) ) ;
assertEquals ( ByteBuffer . wrap ( "2" . getBytes ( ) ) , logRecords . get ( 1 ) . key ( ) ) ;
assertEquals ( ByteBuffer . wrap ( "3" . getBytes ( ) ) , logRecords . get ( 2 ) . key ( ) ) ;
}
@Test
public void convertToV1WithMixedV0AndV2Data ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V0 ) ;
assumeAtLeastV2OrNotZstd ( RecordBatch . MAGIC_VALUE_V0 ) ;
assumeAtLeastV2OrNotZstd ( RecordBatch . MAGIC_VALUE_V1 ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 512 ) ;
MemoryRecordsBuilder builder = MemoryRecords . builder ( buffer , RecordBatch . MAGIC_VALUE_V0 ,
@ -598,31 +613,28 @@ public class MemoryRecordsBuilderTest {
@@ -598,31 +613,28 @@ public class MemoryRecordsBuilderTest {
@Test
public void shouldThrowIllegalStateExceptionOnBuildWhenAborted ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V0 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V0 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 128 ) ;
buffer . position ( bufferOffset ) ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V0 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . CREATE_TIME , 0L , 0L , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH ,
RecordBatch . NO_SEQUENCE , false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
builder . abort ( ) ;
try {
builder . build ( ) ;
fail ( "Should have thrown KafkaException" ) ;
} catch ( IllegalStateException e ) {
// ok
}
assertThrows ( IllegalStateException . class , builder : : build ) ;
}
@Test
public void shouldResetBufferToInitialPositionOnAbort ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V0 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V0 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 128 ) ;
buffer . position ( bufferOffset ) ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V0 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . CREATE_TIME , 0L , 0L , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
builder . append ( 0L , "a" . getBytes ( ) , "1" . getBytes ( ) ) ;
@ -632,12 +644,13 @@ public class MemoryRecordsBuilderTest {
@@ -632,12 +644,13 @@ public class MemoryRecordsBuilderTest {
@Test
public void shouldThrowIllegalStateExceptionOnCloseWhenAborted ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V0 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V0 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 128 ) ;
buffer . position ( bufferOffset ) ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V0 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . CREATE_TIME , 0L , 0L , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
builder . abort ( ) ;
@ -651,12 +664,13 @@ public class MemoryRecordsBuilderTest {
@@ -651,12 +664,13 @@ public class MemoryRecordsBuilderTest {
@Test
public void shouldThrowIllegalStateExceptionOnAppendWhenAborted ( ) {
expectExceptionWithZStd ( compressionType , RecordBatch . MAGIC_VALUE_V0 ) ;
byte magic = RecordBatch . MAGIC_VALUE_V0 ;
assumeAtLeastV2OrNotZstd ( magic ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 128 ) ;
buffer . position ( bufferOffset ) ;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , RecordBatch . MAGIC_VALUE_V0 , compressionType ,
MemoryRecordsBuilder builder = new MemoryRecordsBuilder ( buffer , magic , compressionType ,
TimestampType . CREATE_TIME , 0L , 0L , RecordBatch . NO_PRODUCER_ID , RecordBatch . NO_PRODUCER_EPOCH , RecordBatch . NO_SEQUENCE ,
false , false , RecordBatch . NO_PARTITION_LEADER_EPOCH , buffer . capacity ( ) ) ;
builder . abort ( ) ;
@ -734,10 +748,7 @@ public class MemoryRecordsBuilderTest {
@@ -734,10 +748,7 @@ public class MemoryRecordsBuilderTest {
}
}
private void expectExceptionWithZStd ( CompressionType compressionType , byte magic ) {
if ( compressionType = = CompressionType . ZSTD & & magic < MAGIC_VALUE_V2 ) {
exceptionRule . expect ( IllegalArgumentException . class ) ;
exceptionRule . expectMessage ( "ZStandard compression is not supported for magic " + magic ) ;
}
private void assumeAtLeastV2OrNotZstd ( byte magic ) {
assumeTrue ( compressionType ! = CompressionType . ZSTD | | magic > = MAGIC_VALUE_V2 ) ;
}
}