@ -16,19 +16,28 @@
@@ -16,19 +16,28 @@
* /
package org.apache.kafka.streams.processor.internals ;
import org.apache.kafka.clients.admin.AdminClient ;
import org.apache.kafka.clients.admin.ConfigEntry ;
import org.apache.kafka.clients.admin.CreateTopicsResult ;
import org.apache.kafka.clients.admin.DescribeTopicsResult ;
import org.apache.kafka.clients.admin.MockAdminClient ;
import org.apache.kafka.clients.admin.NewTopic ;
import org.apache.kafka.clients.admin.TopicDescription ;
import org.apache.kafka.clients.producer.ProducerConfig ;
import org.apache.kafka.common.KafkaFuture ;
import org.apache.kafka.common.Node ;
import org.apache.kafka.common.TopicPartitionInfo ;
import org.apache.kafka.common.config.ConfigResource ;
import org.apache.kafka.common.config.TopicConfig ;
import org.apache.kafka.common.errors.TimeoutException ;
import org.apache.kafka.common.errors.TopicExistsException ;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException ;
import org.apache.kafka.common.internals.KafkaFutureImpl ;
import org.apache.kafka.common.utils.Utils ;
import org.apache.kafka.streams.StreamsConfig ;
import org.apache.kafka.streams.errors.StreamsException ;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender ;
import org.easymock.EasyMock ;
import org.junit.After ;
import org.junit.Before ;
import org.junit.Test ;
@ -38,6 +47,7 @@ import java.util.Collections;
@@ -38,6 +47,7 @@ import java.util.Collections;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.Optional ;
import static org.junit.Assert.assertEquals ;
import static org.junit.Assert.assertNull ;
@ -90,18 +100,18 @@ public class InternalTopicManagerTest {
@@ -90,18 +100,18 @@ public class InternalTopicManagerTest {
mockAdminClient . addTopic (
false ,
topic ,
Collections . singletonList ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . < Node > emptyList ( ) ) ) ,
Collections . singletonList ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . emptyList ( ) ) ) ,
null ) ;
assertEquals ( Collections . singletonMap ( topic , 1 ) , internalTopicManager . getNumPartitions ( Collections . singleton ( topic ) ) ) ;
}
@Test
public void shouldCreateRequiredTopics ( ) throws Exception {
final InternalTopicConfig topicConfig = new RepartitionTopicConfig ( topic , Collections . < String , String > emptyMap ( ) ) ;
final InternalTopicConfig topicConfig = new RepartitionTopicConfig ( topic , Collections . emptyMap ( ) ) ;
topicConfig . setNumberOfPartitions ( 1 ) ;
final InternalTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig ( topic2 , Collections . < String , String > emptyMap ( ) ) ;
final InternalTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig ( topic2 , Collections . emptyMap ( ) ) ;
topicConfig2 . setNumberOfPartitions ( 1 ) ;
final InternalTopicConfig topicConfig3 = new WindowedChangelogTopicConfig ( topic3 , Collections . < String , String > emptyMap ( ) ) ;
final InternalTopicConfig topicConfig3 = new WindowedChangelogTopicConfig ( topic3 , Collections . emptyMap ( ) ) ;
topicConfig3 . setNumberOfPartitions ( 1 ) ;
internalTopicManager . makeReady ( Collections . singletonMap ( topic , topicConfig ) ) ;
@ -111,17 +121,17 @@ public class InternalTopicManagerTest {
@@ -111,17 +121,17 @@ public class InternalTopicManagerTest {
assertEquals ( Utils . mkSet ( topic , topic2 , topic3 ) , mockAdminClient . listTopics ( ) . names ( ) . get ( ) ) ;
assertEquals ( new TopicDescription ( topic , false , new ArrayList < TopicPartitionInfo > ( ) {
{
add ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . < Node > emptyList ( ) ) ) ;
add ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . emptyList ( ) ) ) ;
}
} ) , mockAdminClient . describeTopics ( Collections . singleton ( topic ) ) . values ( ) . get ( topic ) . get ( ) ) ;
assertEquals ( new TopicDescription ( topic2 , false , new ArrayList < TopicPartitionInfo > ( ) {
{
add ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . < Node > emptyList ( ) ) ) ;
add ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . emptyList ( ) ) ) ;
}
} ) , mockAdminClient . describeTopics ( Collections . singleton ( topic2 ) ) . values ( ) . get ( topic2 ) . get ( ) ) ;
assertEquals ( new TopicDescription ( topic3 , false , new ArrayList < TopicPartitionInfo > ( ) {
{
add ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . < Node > emptyList ( ) ) ) ;
add ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . emptyList ( ) ) ) ;
}
} ) , mockAdminClient . describeTopics ( Collections . singleton ( topic3 ) ) . values ( ) . get ( topic3 ) . get ( ) ) ;
@ -132,7 +142,49 @@ public class InternalTopicManagerTest {
@@ -132,7 +142,49 @@ public class InternalTopicManagerTest {
assertEquals ( new ConfigEntry ( TopicConfig . CLEANUP_POLICY_CONFIG , TopicConfig . CLEANUP_POLICY_DELETE ) , mockAdminClient . describeConfigs ( Collections . singleton ( resource ) ) . values ( ) . get ( resource ) . get ( ) . get ( TopicConfig . CLEANUP_POLICY_CONFIG ) ) ;
assertEquals ( new ConfigEntry ( TopicConfig . CLEANUP_POLICY_CONFIG , TopicConfig . CLEANUP_POLICY_COMPACT ) , mockAdminClient . describeConfigs ( Collections . singleton ( resource2 ) ) . values ( ) . get ( resource2 ) . get ( ) . get ( TopicConfig . CLEANUP_POLICY_CONFIG ) ) ;
assertEquals ( new ConfigEntry ( TopicConfig . CLEANUP_POLICY_CONFIG , TopicConfig . CLEANUP_POLICY_COMPACT + "," + TopicConfig . CLEANUP_POLICY_DELETE ) , mockAdminClient . describeConfigs ( Collections . singleton ( resource3 ) ) . values ( ) . get ( resource3 ) . get ( ) . get ( TopicConfig . CLEANUP_POLICY_CONFIG ) ) ;
}
@Test
public void shouldCompleteTopicValidationOnRetry ( ) {
final AdminClient admin = EasyMock . createNiceMock ( AdminClient . class ) ;
final InternalTopicManager topicManager = new InternalTopicManager ( admin , new StreamsConfig ( config ) ) ;
final TopicPartitionInfo partitionInfo = new TopicPartitionInfo ( 0 , broker1 ,
Collections . singletonList ( broker1 ) , Collections . singletonList ( broker1 ) ) ;
final KafkaFutureImpl < TopicDescription > topicDescriptionSuccessFuture = new KafkaFutureImpl < > ( ) ;
final KafkaFutureImpl < TopicDescription > topicDescriptionFailFuture = new KafkaFutureImpl < > ( ) ;
topicDescriptionSuccessFuture . complete ( new TopicDescription ( topic , false , Collections . singletonList ( partitionInfo ) , Collections . emptySet ( ) ) ) ;
topicDescriptionFailFuture . completeExceptionally ( new UnknownTopicOrPartitionException ( "KABOOM!" ) ) ;
final KafkaFutureImpl < CreateTopicsResult . TopicMetadataAndConfig > topicCreationFuture = new KafkaFutureImpl < > ( ) ;
topicCreationFuture . completeExceptionally ( new TopicExistsException ( "KABOOM!" ) ) ;
// let the first describe succeed on topic, and fail on topic2, and then let creation throws topics-existed;
// it should retry with just topic2 and then let it succeed
EasyMock . expect ( admin . describeTopics ( Utils . mkSet ( topic , topic2 ) ) )
. andReturn ( new MockDescribeTopicsResult ( Utils . mkMap (
Utils . mkEntry ( topic , topicDescriptionSuccessFuture ) ,
Utils . mkEntry ( topic2 , topicDescriptionFailFuture )
) ) ) . once ( ) ;
EasyMock . expect ( admin . createTopics ( Collections . singleton ( new NewTopic ( topic2 , Optional . of ( 1 ) , Optional . of ( ( short ) 1 ) )
. configs ( Utils . mkMap ( Utils . mkEntry ( TopicConfig . CLEANUP_POLICY_CONFIG , TopicConfig . CLEANUP_POLICY_COMPACT ) ,
Utils . mkEntry ( TopicConfig . MESSAGE_TIMESTAMP_TYPE_CONFIG , "CreateTime" ) ) ) ) ) )
. andReturn ( new MockCreateTopicsResult ( Collections . singletonMap ( topic2 , topicCreationFuture ) ) ) . once ( ) ;
EasyMock . expect ( admin . describeTopics ( Collections . singleton ( topic2 ) ) )
. andReturn ( new MockDescribeTopicsResult ( Collections . singletonMap ( topic2 , topicDescriptionSuccessFuture ) ) ) ;
EasyMock . replay ( admin ) ;
final InternalTopicConfig topicConfig = new UnwindowedChangelogTopicConfig ( topic , Collections . emptyMap ( ) ) ;
topicConfig . setNumberOfPartitions ( 1 ) ;
final InternalTopicConfig topic2Config = new UnwindowedChangelogTopicConfig ( topic2 , Collections . emptyMap ( ) ) ;
topic2Config . setNumberOfPartitions ( 1 ) ;
topicManager . makeReady ( Utils . mkMap (
Utils . mkEntry ( topic , topicConfig ) ,
Utils . mkEntry ( topic2 , topic2Config )
) ) ;
EasyMock . verify ( admin ) ;
}
@Test
@ -142,14 +194,14 @@ public class InternalTopicManagerTest {
@@ -142,14 +194,14 @@ public class InternalTopicManagerTest {
topic ,
new ArrayList < TopicPartitionInfo > ( ) {
{
add ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . < Node > emptyList ( ) ) ) ;
add ( new TopicPartitionInfo ( 1 , broker1 , singleReplica , Collections . < Node > emptyList ( ) ) ) ;
add ( new TopicPartitionInfo ( 0 , broker1 , singleReplica , Collections . emptyList ( ) ) ) ;
add ( new TopicPartitionInfo ( 1 , broker1 , singleReplica , Collections . emptyList ( ) ) ) ;
}
} ,
null ) ;
try {
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig ( topic , Collections . < String , String > emptyMap ( ) ) ;
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig ( topic , Collections . emptyMap ( ) ) ;
internalTopicConfig . setNumberOfPartitions ( 1 ) ;
internalTopicManager . makeReady ( Collections . singletonMap ( topic , internalTopicConfig ) ) ;
fail ( "Should have thrown StreamsException" ) ;
@ -161,7 +213,7 @@ public class InternalTopicManagerTest {
@@ -161,7 +213,7 @@ public class InternalTopicManagerTest {
mockAdminClient . addTopic (
false ,
topic ,
Collections . singletonList ( new TopicPartitionInfo ( 0 , broker1 , cluster , Collections . < Node > emptyList ( ) ) ) ,
Collections . singletonList ( new TopicPartitionInfo ( 0 , broker1 , cluster , Collections . emptyList ( ) ) ) ,
null ) ;
// attempt to create it again with replication 1
@ -213,11 +265,10 @@ public class InternalTopicManagerTest {
@@ -213,11 +265,10 @@ public class InternalTopicManagerTest {
topicConfigMap . put ( topic , internalTopicConfig ) ;
topicConfigMap . put ( "internal-topic" , internalTopicConfigII ) ;
internalTopicManager . makeReady ( topicConfigMap ) ;
boolean foundExpectedMessage = false ;
for ( final String message : appender . getMessages ( ) ) {
foundExpectedMessage | = message . contains ( "Topic internal-topic is unknown or not found, hence not existed yet. " ) ;
foundExpectedMessage | = message . contains ( "Topic internal-topic is unknown or not found, hence not existed yet" ) ;
}
assertTrue ( foundExpectedMessage ) ;
@ -243,4 +294,17 @@ public class InternalTopicManagerTest {
@@ -243,4 +294,17 @@ public class InternalTopicManagerTest {
}
}
private class MockCreateTopicsResult extends CreateTopicsResult {
MockCreateTopicsResult ( final Map < String , KafkaFuture < TopicMetadataAndConfig > > futures ) {
super ( futures ) ;
}
}
private class MockDescribeTopicsResult extends DescribeTopicsResult {
MockDescribeTopicsResult ( final Map < String , KafkaFuture < TopicDescription > > futures ) {
super ( futures ) ;
}
}
}