@ -20,10 +20,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer ;
import org.apache.kafka.clients.consumer.MockConsumer ;
import org.apache.kafka.clients.consumer.OffsetAndMetadata ;
import org.apache.kafka.clients.consumer.OffsetAndMetadata ;
import org.apache.kafka.clients.consumer.OffsetResetStrategy ;
import org.apache.kafka.clients.consumer.OffsetResetStrategy ;
import org.apache.kafka.common.MetricName ;
import org.apache.kafka.common.Node ;
import org.apache.kafka.common.Node ;
import org.apache.kafka.common.PartitionInfo ;
import org.apache.kafka.common.PartitionInfo ;
import org.apache.kafka.common.TopicPartition ;
import org.apache.kafka.common.TopicPartition ;
import org.apache.kafka.common.metrics.KafkaMetric ;
import org.apache.kafka.common.metrics.Metrics ;
import org.apache.kafka.common.metrics.Metrics ;
import org.apache.kafka.common.metrics.Sensor ;
import org.apache.kafka.common.metrics.stats.Total ;
import org.apache.kafka.common.record.TimestampType ;
import org.apache.kafka.common.record.TimestampType ;
import org.apache.kafka.common.serialization.IntegerDeserializer ;
import org.apache.kafka.common.serialization.IntegerDeserializer ;
import org.apache.kafka.common.serialization.IntegerSerializer ;
import org.apache.kafka.common.serialization.IntegerSerializer ;
@ -45,6 +49,7 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest ;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest ;
import org.apache.kafka.streams.kstream.internals.TimeWindow ;
import org.apache.kafka.streams.kstream.internals.TimeWindow ;
import org.apache.kafka.streams.processor.TaskId ;
import org.apache.kafka.streams.processor.TaskId ;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl ;
import org.apache.kafka.streams.state.KeyValueIterator ;
import org.apache.kafka.streams.state.KeyValueIterator ;
import org.apache.kafka.streams.state.WindowStore ;
import org.apache.kafka.streams.state.WindowStore ;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint ;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint ;
@ -136,7 +141,10 @@ public class StandbyTaskTest {
}
}
private final MockConsumer < byte [ ] , byte [ ] > consumer = new MockConsumer < > ( OffsetResetStrategy . EARLIEST ) ;
private final MockConsumer < byte [ ] , byte [ ] > consumer = new MockConsumer < > ( OffsetResetStrategy . EARLIEST ) ;
private final MockRestoreConsumer < Integer , Integer > restoreStateConsumer = new MockRestoreConsumer < > ( new IntegerSerializer ( ) , new IntegerSerializer ( ) ) ;
private final MockRestoreConsumer < Integer , Integer > restoreStateConsumer = new MockRestoreConsumer < > (
new IntegerSerializer ( ) ,
new IntegerSerializer ( )
) ;
private final StoreChangelogReader changelogReader = new StoreChangelogReader (
private final StoreChangelogReader changelogReader = new StoreChangelogReader (
restoreStateConsumer ,
restoreStateConsumer ,
Duration . ZERO ,
Duration . ZERO ,
@ -147,6 +155,9 @@ public class StandbyTaskTest {
private final byte [ ] recordValue = intSerializer . serialize ( null , 10 ) ;
private final byte [ ] recordValue = intSerializer . serialize ( null , 10 ) ;
private final byte [ ] recordKey = intSerializer . serialize ( null , 1 ) ;
private final byte [ ] recordKey = intSerializer . serialize ( null , 1 ) ;
private final String threadName = "threadName" ;
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl ( new Metrics ( ) , threadName ) ;
@Before
@Before
public void setup ( ) throws Exception {
public void setup ( ) throws Exception {
restoreStateConsumer . reset ( ) ;
restoreStateConsumer . reset ( ) ;
@ -177,7 +188,14 @@ public class StandbyTaskTest {
@Test
@Test
public void testStorePartitions ( ) throws IOException {
public void testStorePartitions ( ) throws IOException {
final StreamsConfig config = createConfig ( baseDir ) ;
final StreamsConfig config = createConfig ( baseDir ) ;
task = new StandbyTask ( taskId , topicPartitions , topology , consumer , changelogReader , config , null , stateDirectory ) ;
task = new StandbyTask ( taskId ,
topicPartitions ,
topology ,
consumer ,
changelogReader ,
config ,
streamsMetrics ,
stateDirectory ) ;
task . initializeStateStores ( ) ;
task . initializeStateStores ( ) ;
assertEquals ( Utils . mkSet ( partition2 , partition1 ) , new HashSet < > ( task . checkpointedOffsets ( ) . keySet ( ) ) ) ;
assertEquals ( Utils . mkSet ( partition2 , partition1 ) , new HashSet < > ( task . checkpointedOffsets ( ) . keySet ( ) ) ) ;
}
}
@ -186,13 +204,31 @@ public class StandbyTaskTest {
@Test
@Test
public void testUpdateNonInitializedStore ( ) throws IOException {
public void testUpdateNonInitializedStore ( ) throws IOException {
final StreamsConfig config = createConfig ( baseDir ) ;
final StreamsConfig config = createConfig ( baseDir ) ;
task = new StandbyTask ( taskId , topicPartitions , topology , consumer , changelogReader , config , null , stateDirectory ) ;
task = new StandbyTask ( taskId ,
topicPartitions ,
topology ,
consumer ,
changelogReader ,
config ,
streamsMetrics ,
stateDirectory ) ;
restoreStateConsumer . assign ( new ArrayList < > ( task . checkpointedOffsets ( ) . keySet ( ) ) ) ;
restoreStateConsumer . assign ( new ArrayList < > ( task . checkpointedOffsets ( ) . keySet ( ) ) ) ;
try {
try {
task . update ( partition1 ,
task . update ( partition1 ,
singletonList ( new ConsumerRecord < > ( partition1 . topic ( ) , partition1 . partition ( ) , 10 , 0L , TimestampType . CREATE_TIME , 0L , 0 , 0 , recordKey , recordValue ) )
singletonList (
new ConsumerRecord < > (
partition1 . topic ( ) ,
partition1 . partition ( ) ,
10 ,
0L ,
TimestampType . CREATE_TIME ,
0L ,
0 ,
0 ,
recordKey ,
recordValue ) )
) ;
) ;
fail ( "expected an exception" ) ;
fail ( "expected an exception" ) ;
} catch ( final NullPointerException npe ) {
} catch ( final NullPointerException npe ) {
@ -204,15 +240,48 @@ public class StandbyTaskTest {
@Test
@Test
public void testUpdate ( ) throws IOException {
public void testUpdate ( ) throws IOException {
final StreamsConfig config = createConfig ( baseDir ) ;
final StreamsConfig config = createConfig ( baseDir ) ;
task = new StandbyTask ( taskId , topicPartitions , topology , consumer , changelogReader , config , null , stateDirectory ) ;
task = new StandbyTask ( taskId ,
topicPartitions ,
topology ,
consumer ,
changelogReader ,
config ,
streamsMetrics ,
stateDirectory ) ;
task . initializeStateStores ( ) ;
task . initializeStateStores ( ) ;
final Set < TopicPartition > partition = Collections . singleton ( partition2 ) ;
final Set < TopicPartition > partition = Collections . singleton ( partition2 ) ;
restoreStateConsumer . assign ( partition ) ;
restoreStateConsumer . assign ( partition ) ;
for ( final ConsumerRecord < Integer , Integer > record : asList (
for ( final ConsumerRecord < Integer , Integer > record : asList ( new ConsumerRecord < > ( partition2 . topic ( ) ,
new ConsumerRecord < > ( partition2 . topic ( ) , partition2 . partition ( ) , 10 , 0L , TimestampType . CREATE_TIME , 0L , 0 , 0 , 1 , 100 ) ,
partition2 . partition ( ) ,
new ConsumerRecord < > ( partition2 . topic ( ) , partition2 . partition ( ) , 20 , 0L , TimestampType . CREATE_TIME , 0L , 0 , 0 , 2 , 100 ) ,
10 ,
new ConsumerRecord < > ( partition2 . topic ( ) , partition2 . partition ( ) , 30 , 0L , TimestampType . CREATE_TIME , 0L , 0 , 0 , 3 , 100 ) ) ) {
0L ,
TimestampType . CREATE_TIME ,
0L ,
0 ,
0 ,
1 ,
100 ) ,
new ConsumerRecord < > ( partition2 . topic ( ) ,
partition2 . partition ( ) ,
20 ,
0L ,
TimestampType . CREATE_TIME ,
0L ,
0 ,
0 ,
2 ,
100 ) ,
new ConsumerRecord < > ( partition2 . topic ( ) ,
partition2 . partition ( ) ,
30 ,
0L ,
TimestampType . CREATE_TIME ,
0L ,
0 ,
0 ,
3 ,
100 ) ) ) {
restoreStateConsumer . bufferRecord ( record ) ;
restoreStateConsumer . bufferRecord ( record ) ;
}
}
@ -380,7 +449,8 @@ public class StandbyTaskTest {
}
}
@SuppressWarnings ( "unchecked" )
@SuppressWarnings ( "unchecked" )
private List < KeyValue < Windowed < Integer > , Long > > getWindowedStoreContents ( final String storeName , final StandbyTask task ) {
private List < KeyValue < Windowed < Integer > , Long > > getWindowedStoreContents ( final String storeName ,
final StandbyTask task ) {
final StandbyContextImpl context = ( StandbyContextImpl ) task . context ( ) ;
final StandbyContextImpl context = ( StandbyContextImpl ) task . context ( ) ;
final List < KeyValue < Windowed < Integer > , Long > > result = new ArrayList < > ( ) ;
final List < KeyValue < Windowed < Integer > , Long > > result = new ArrayList < > ( ) ;
@ -410,7 +480,7 @@ public class StandbyTaskTest {
consumer ,
consumer ,
changelogReader ,
changelogReader ,
createConfig ( baseDir ) ,
createConfig ( baseDir ) ,
null ,
streamsMetrics ,
stateDirectory
stateDirectory
) ;
) ;
task . initializeStateStores ( ) ;
task . initializeStateStores ( ) ;
@ -464,7 +534,9 @@ public class StandbyTaskTest {
assertEquals ( emptyList ( ) , remaining ) ;
assertEquals ( emptyList ( ) , remaining ) ;
}
}
private ConsumerRecord < byte [ ] , byte [ ] > makeConsumerRecord ( final TopicPartition topicPartition , final long offset , final int key ) {
private ConsumerRecord < byte [ ] , byte [ ] > makeConsumerRecord ( final TopicPartition topicPartition ,
final long offset ,
final int key ) {
final IntegerSerializer integerSerializer = new IntegerSerializer ( ) ;
final IntegerSerializer integerSerializer = new IntegerSerializer ( ) ;
return new ConsumerRecord < > (
return new ConsumerRecord < > (
topicPartition . topic ( ) ,
topicPartition . topic ( ) ,
@ -491,7 +563,8 @@ public class StandbyTaskTest {
@Test
@Test
public void shouldInitializeWindowStoreWithoutException ( ) throws IOException {
public void shouldInitializeWindowStoreWithoutException ( ) throws IOException {
final InternalStreamsBuilder builder = new InternalStreamsBuilder ( new InternalTopologyBuilder ( ) ) ;
final InternalStreamsBuilder builder = new InternalStreamsBuilder ( new InternalTopologyBuilder ( ) ) ;
builder . stream ( Collections . singleton ( "topic" ) , new ConsumedInternal < > ( ) ) . groupByKey ( ) . windowedBy ( TimeWindows . of ( ofMillis ( 100 ) ) ) . count ( ) ;
builder . stream ( Collections . singleton ( "topic" ) ,
new ConsumedInternal < > ( ) ) . groupByKey ( ) . windowedBy ( TimeWindows . of ( ofMillis ( 100 ) ) ) . count ( ) ;
initializeStandbyStores ( builder ) ;
initializeStandbyStores ( builder ) ;
}
}
@ -522,7 +595,8 @@ public class StandbyTaskTest {
public void shouldCheckpointStoreOffsetsOnCommit ( ) throws IOException {
public void shouldCheckpointStoreOffsetsOnCommit ( ) throws IOException {
consumer . assign ( Collections . singletonList ( globalTopicPartition ) ) ;
consumer . assign ( Collections . singletonList ( globalTopicPartition ) ) ;
final Map < TopicPartition , OffsetAndMetadata > committedOffsets = new HashMap < > ( ) ;
final Map < TopicPartition , OffsetAndMetadata > committedOffsets = new HashMap < > ( ) ;
committedOffsets . put ( new TopicPartition ( globalTopicPartition . topic ( ) , globalTopicPartition . partition ( ) ) , new OffsetAndMetadata ( 100L ) ) ;
committedOffsets . put ( new TopicPartition ( globalTopicPartition . topic ( ) , globalTopicPartition . partition ( ) ) ,
new OffsetAndMetadata ( 100L ) ) ;
consumer . commitSync ( committedOffsets ) ;
consumer . commitSync ( committedOffsets ) ;
restoreStateConsumer . updatePartitions (
restoreStateConsumer . updatePartitions (
@ -540,7 +614,7 @@ public class StandbyTaskTest {
consumer ,
consumer ,
changelogReader ,
changelogReader ,
config ,
config ,
null ,
streamsMetrics ,
stateDirectory
stateDirectory
) ;
) ;
task . initializeStateStores ( ) ;
task . initializeStateStores ( ) ;
@ -550,9 +624,11 @@ public class StandbyTaskTest {
final byte [ ] serializedValue = Serdes . Integer ( ) . serializer ( ) . serialize ( "" , 1 ) ;
final byte [ ] serializedValue = Serdes . Integer ( ) . serializer ( ) . serialize ( "" , 1 ) ;
task . update (
task . update (
globalTopicPartition ,
globalTopicPartition ,
singletonList (
singletonList ( new ConsumerRecord < > ( globalTopicPartition . topic ( ) ,
new ConsumerRecord < > ( globalTopicPartition . topic ( ) , globalTopicPartition . partition ( ) , 50L , serializedValue , serializedValue )
globalTopicPartition . partition ( ) ,
)
50L ,
serializedValue ,
serializedValue ) )
) ;
) ;
time . sleep ( config . getLong ( StreamsConfig . COMMIT_INTERVAL_MS_CONFIG ) ) ;
time . sleep ( config . getLong ( StreamsConfig . COMMIT_INTERVAL_MS_CONFIG ) ) ;
@ -569,7 +645,8 @@ public class StandbyTaskTest {
public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed ( ) throws Exception {
public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed ( ) throws Exception {
consumer . assign ( Collections . singletonList ( globalTopicPartition ) ) ;
consumer . assign ( Collections . singletonList ( globalTopicPartition ) ) ;
final Map < TopicPartition , OffsetAndMetadata > committedOffsets = new HashMap < > ( ) ;
final Map < TopicPartition , OffsetAndMetadata > committedOffsets = new HashMap < > ( ) ;
committedOffsets . put ( new TopicPartition ( globalTopicPartition . topic ( ) , globalTopicPartition . partition ( ) ) , new OffsetAndMetadata ( 100L ) ) ;
committedOffsets . put ( new TopicPartition ( globalTopicPartition . topic ( ) , globalTopicPartition . partition ( ) ) ,
new OffsetAndMetadata ( 100L ) ) ;
consumer . commitSync ( committedOffsets ) ;
consumer . commitSync ( committedOffsets ) ;
restoreStateConsumer . updatePartitions (
restoreStateConsumer . updatePartitions (
@ -586,7 +663,7 @@ public class StandbyTaskTest {
consumer ,
consumer ,
changelogReader ,
changelogReader ,
config ,
config ,
null ,
streamsMetrics ,
stateDirectory
stateDirectory
) {
) {
@Override
@Override
@ -610,4 +687,62 @@ public class StandbyTaskTest {
assertTrue ( closedStateManager . get ( ) ) ;
assertTrue ( closedStateManager . get ( ) ) ;
}
}
private MetricName setupCloseTaskMetric ( ) {
final MetricName metricName = new MetricName ( "name" , "group" , "description" , Collections . emptyMap ( ) ) ;
final Sensor sensor = streamsMetrics . threadLevelSensor ( "task-closed" , Sensor . RecordingLevel . INFO ) ;
sensor . add ( metricName , new Total ( ) ) ;
return metricName ;
}
private void verifyCloseTaskMetric ( final double expected ,
final StreamsMetricsImpl streamsMetrics ,
final MetricName metricName ) {
final KafkaMetric metric = ( KafkaMetric ) streamsMetrics . metrics ( ) . get ( metricName ) ;
final double totalCloses = metric . measurable ( ) . measure ( metric . config ( ) , System . currentTimeMillis ( ) ) ;
assertThat ( totalCloses , equalTo ( expected ) ) ;
}
@Test
public void shouldRecordTaskClosedMetricOnClose ( ) throws IOException {
final MetricName metricName = setupCloseTaskMetric ( ) ;
final StandbyTask task = new StandbyTask (
taskId ,
ktablePartitions ,
ktableTopology ,
consumer ,
changelogReader ,
createConfig ( baseDir ) ,
streamsMetrics ,
stateDirectory
) ;
final boolean clean = true ;
final boolean isZombie = false ;
task . close ( clean , isZombie ) ;
final double expectedCloseTaskMetric = 1 . 0 ;
verifyCloseTaskMetric ( expectedCloseTaskMetric , streamsMetrics , metricName ) ;
}
@Test
public void shouldRecordTaskClosedMetricOnCloseSuspended ( ) throws IOException {
final MetricName metricName = setupCloseTaskMetric ( ) ;
final StandbyTask task = new StandbyTask (
taskId ,
ktablePartitions ,
ktableTopology ,
consumer ,
changelogReader ,
createConfig ( baseDir ) ,
streamsMetrics ,
stateDirectory
) ;
final boolean clean = true ;
final boolean isZombie = false ;
task . closeSuspended ( clean , isZombie , new RuntimeException ( ) ) ;
final double expectedCloseTaskMetric = 1 . 0 ;
verifyCloseTaskMetric ( expectedCloseTaskMetric , streamsMetrics , metricName ) ;
}
}
}