@ -19,16 +19,24 @@ package org.apache.kafka.streams.integration.utils;
@@ -19,16 +19,24 @@ package org.apache.kafka.streams.integration.utils;
import kafka.server.KafkaConfig $ ;
import kafka.server.KafkaServer ;
import kafka.utils.MockTime ;
import kafka.utils.ZkUtils ;
import kafka.zk.EmbeddedZookeeper ;
import org.apache.kafka.common.TopicPartition ;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException ;
import org.apache.kafka.common.security.JaasUtils ;
import org.apache.kafka.test.TestCondition ;
import org.apache.kafka.test.TestUtils ;
import org.junit.rules.ExternalResource ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.io.IOException ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Properties ;
import java.util.Set ;
/ * *
* Runs an in - memory , "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker .
@ -38,8 +46,11 @@ public class EmbeddedKafkaCluster extends ExternalResource {
@@ -38,8 +46,11 @@ public class EmbeddedKafkaCluster extends ExternalResource {
private static final Logger log = LoggerFactory . getLogger ( EmbeddedKafkaCluster . class ) ;
private static final int DEFAULT_BROKER_PORT = 0 ; // 0 results in a random port being selected
private static final int TOPIC_CREATION_TIMEOUT = 30000 ;
private static final int TOPIC_DELETION_TIMEOUT = 30000 ;
private EmbeddedZookeeper zookeeper = null ;
private final KafkaEmbedded [ ] brokers ;
private ZkUtils zkUtils = null ;
private final Properties brokerConfig ;
public final MockTime time ;
@ -77,6 +88,12 @@ public class EmbeddedKafkaCluster extends ExternalResource {
@@ -77,6 +88,12 @@ public class EmbeddedKafkaCluster extends ExternalResource {
zookeeper = new EmbeddedZookeeper ( ) ;
log . debug ( "ZooKeeper instance is running at {}" , zKConnectString ( ) ) ;
zkUtils = ZkUtils . apply (
zKConnectString ( ) ,
30000 ,
30000 ,
JaasUtils . isZkSecurityEnabled ( ) ) ;
brokerConfig . put ( KafkaConfig$ . MODULE$ . ZkConnectProp ( ) , zKConnectString ( ) ) ;
brokerConfig . put ( KafkaConfig$ . MODULE$ . PortProp ( ) , DEFAULT_BROKER_PORT ) ;
putIfAbsent ( brokerConfig , KafkaConfig$ . MODULE$ . DeleteTopicEnableProp ( ) , true ) ;
@ -109,6 +126,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
@@ -109,6 +126,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
for ( final KafkaEmbedded broker : brokers ) {
broker . stop ( ) ;
}
zkUtils . close ( ) ;
zookeeper . shutdown ( ) ;
}
@ -141,6 +159,17 @@ public class EmbeddedKafkaCluster extends ExternalResource {
@@ -141,6 +159,17 @@ public class EmbeddedKafkaCluster extends ExternalResource {
stop ( ) ;
}
/ * *
* Create multiple Kafka topics each with 1 partition and a replication factor of 1 .
*
* @param topics The name of the topics .
* /
public void createTopics ( final String . . . topics ) throws InterruptedException {
for ( final String topic : topics ) {
createTopic ( topic , 1 , 1 , new Properties ( ) ) ;
}
}
/ * *
* Create a Kafka topic with 1 partition and a replication factor of 1 .
*
@ -181,8 +210,93 @@ public class EmbeddedKafkaCluster extends ExternalResource {
@@ -181,8 +210,93 @@ public class EmbeddedKafkaCluster extends ExternalResource {
IntegrationTestUtils . waitForTopicPartitions ( brokers ( ) , topicPartitions , TOPIC_CREATION_TIMEOUT ) ;
}
public void deleteTopic ( final String topic ) {
brokers [ 0 ] . deleteTopic ( topic ) ;
/ * *
* Deletes a topic returns immediately .
*
* @param topic the name of the topic
* /
public void deleteTopic ( final String topic ) throws Exception {
deleteTopicsAndWait ( - 1L , topic ) ;
}
/ * *
* Deletes a topic and blocks for max 30 sec until the topic got deleted .
*
* @param topic the name of the topic
* /
public void deleteTopicAndWait ( final String topic ) throws Exception {
deleteTopicsAndWait ( TOPIC_DELETION_TIMEOUT , topic ) ;
}
/ * *
* Deletes a topic and blocks until the topic got deleted .
*
* @param timeoutMs the max time to wait for the topic to be deleted ( does not block if { @code < = 0 } )
* @param topic the name of the topic
* /
public void deleteTopicAndWait ( final long timeoutMs , final String topic ) throws Exception {
deleteTopicsAndWait ( timeoutMs , topic ) ;
}
/ * *
* Deletes multiple topics returns immediately .
*
* @param topics the name of the topics
* /
public void deleteTopics ( final String . . . topics ) throws Exception {
deleteTopicsAndWait ( - 1 , topics ) ;
}
/ * *
* Deletes multiple topics and blocks for max 30 sec until all topics got deleted .
*
* @param topics the name of the topics
* /
public void deleteTopicsAndWait ( final String . . . topics ) throws Exception {
deleteTopicsAndWait ( TOPIC_DELETION_TIMEOUT , topics ) ;
}
/ * *
* Deletes multiple topics and blocks until all topics got deleted .
*
* @param timeoutMs the max time to wait for the topics to be deleted ( does not block if { @code < = 0 } )
* @param topics the name of the topics
* /
public void deleteTopicsAndWait ( final long timeoutMs , final String . . . topics ) throws Exception {
for ( final String topic : topics ) {
try {
brokers [ 0 ] . deleteTopic ( topic ) ;
} catch ( final UnknownTopicOrPartitionException e ) { }
}
if ( timeoutMs > 0 ) {
TestUtils . waitForCondition ( new TopicsDeletedCondition ( topics ) , timeoutMs , "Topics not deleted after " + timeoutMs + " milli seconds." ) ;
}
}
public void deleteAndRecreateTopics ( final String . . . topics ) throws Exception {
deleteTopicsAndWait ( TOPIC_DELETION_TIMEOUT , topics ) ;
createTopics ( topics ) ;
}
public void deleteAndRecreateTopics ( final long timeoutMs , final String . . . topics ) throws Exception {
deleteTopicsAndWait ( timeoutMs , topics ) ;
createTopics ( topics ) ;
}
private final class TopicsDeletedCondition implements TestCondition {
final Set < String > deletedTopic = new HashSet < > ( ) ;
private TopicsDeletedCondition ( final String . . . topics ) {
Collections . addAll ( deletedTopic , topics ) ;
}
@Override
public boolean conditionMet ( ) {
final Set < String > allTopics = new HashSet < > ( ) ;
allTopics . addAll ( scala . collection . JavaConversions . seqAsJavaList ( zkUtils . getAllTopics ( ) ) ) ;
return ! allTopics . removeAll ( deletedTopic ) ;
}
}
public List < KafkaServer > brokers ( ) {