From 49587750570a2aaf332eb73f5aad373196f984ce Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 23 May 2017 11:53:13 -0700 Subject: [PATCH] MINOR: improve EmbeddedKafkaCluster test utility for deleting topics Author: Matthias J. Sax Reviewers: Bill Bejeck, Damian Guy, Eno Thereska, Guozhang Wang Closes #3104 from mjsax/minor-improve-embedded-kafka-cluster --- .../integration/FanoutIntegrationTest.java | 4 +- .../GlobalKTableIntegrationTest.java | 3 +- .../InternalTopicIntegrationTest.java | 3 +- .../integration/JoinIntegrationTest.java | 41 +----- .../KStreamAggregationIntegrationTest.java | 3 +- .../KStreamKTableJoinIntegrationTest.java | 4 +- ...msFineGrainedAutoResetIntegrationTest.java | 47 +++---- .../KTableKTableJoinIntegrationTest.java | 5 +- .../QueryableStateIntegrationTest.java | 7 +- .../RegexSourceIntegrationTest.java | 23 ++-- .../integration/ResetIntegrationTest.java | 77 +----------- .../utils/EmbeddedKafkaCluster.java | 118 +++++++++++++++++- 12 files changed, 163 insertions(+), 172 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java index 859331719b7..f1f09a40a02 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java @@ -79,9 +79,7 @@ public class FanoutIntegrationTest { @BeforeClass public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(INPUT_TOPIC_A); - CLUSTER.createTopic(OUTPUT_TOPIC_B); - CLUSTER.createTopic(OUTPUT_TOPIC_C); + CLUSTER.createTopics(INPUT_TOPIC_A, OUTPUT_TOPIC_B, OUTPUT_TOPIC_C); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index e5ed3d8b7ea..869c255e126 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -222,8 +222,7 @@ public class GlobalKTableIntegrationTest { inputStream = "input-stream-" + testNo; inputTable = "input-table-" + testNo; globalOne = "globalOne-" + testNo; - CLUSTER.createTopic(inputStream); - CLUSTER.createTopic(inputTable); + CLUSTER.createTopics(inputStream, inputTable); CLUSTER.createTopic(globalOne, 2, 1); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 29cdc1b7bcc..3658d10079d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -77,8 +77,7 @@ public class InternalTopicIntegrationTest { @BeforeClass public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(DEFAULT_INPUT_TOPIC); - CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); + CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC); } @Before diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index 040c784c0b9..b69e58b3eae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -16,10 +16,8 @@ */ package org.apache.kafka.streams.integration; -import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -36,10 +34,8 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -48,11 +44,9 @@ import org.junit.experimental.categories.Category; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Properties; -import java.util.Set; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -65,8 +59,6 @@ public class JoinIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - private static ZkUtils zkUtils = null; - private static final String APP_ID = "join-integration-test"; private static final String INPUT_TOPIC_1 = "inputTopicLeft"; private static final String INPUT_TOPIC_2 = "inputTopicRight"; @@ -107,8 +99,6 @@ public class JoinIntegrationTest { } }; - private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition(); - @BeforeClass public static void setupConfigsAndUtils() throws Exception { PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -131,25 +121,11 @@ public class JoinIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - - zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()); - } - - @AfterClass - public static void release() { - if (zkUtils != null) { - zkUtils.close(); - } } @Before public void prepareTopology() throws Exception { - CLUSTER.createTopic(INPUT_TOPIC_1); - CLUSTER.createTopic(INPUT_TOPIC_2); - CLUSTER.createTopic(OUTPUT_TOPIC); + CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC); builder = new KStreamBuilder(); leftTable = builder.table(INPUT_TOPIC_1, "leftTable"); @@ -160,11 +136,7 @@ public class JoinIntegrationTest { @After public void cleanup() throws Exception { - CLUSTER.deleteTopic(INPUT_TOPIC_1); - CLUSTER.deleteTopic(INPUT_TOPIC_2); - CLUSTER.deleteTopic(OUTPUT_TOPIC); - - TestUtils.waitForCondition(topicsGotDeleted, 120000, "Topics not deleted after 120 seconds."); + CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC); } private void checkResult(final String outputTopic, final List expectedResult) throws Exception { @@ -414,15 +386,6 @@ public class JoinIntegrationTest { runTest(expectedResult); } - private final class TopicsGotDeletedCondition implements TestCondition { - @Override - public boolean conditionMet() { - final Set allTopics = new HashSet<>(); - allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); - return !allTopics.contains(INPUT_TOPIC_1) && !allTopics.contains(INPUT_TOPIC_2) && !allTopics.contains(OUTPUT_TOPIC); - } - } - private final class Input { String topic; KeyValue record; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 07222106b41..0d5472c617e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -662,8 +662,7 @@ public class KStreamAggregationIntegrationTest { outputTopic = "output-" + testNo; userSessionsStream = userSessionsStream + "-" + testNo; CLUSTER.createTopic(streamOneInput, 3, 1); - CLUSTER.createTopic(userSessionsStream); - CLUSTER.createTopic(outputTopic); + CLUSTER.createTopics(userSessionsStream, outputTopic); } private void startStreams() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index 9b5b428cc6d..5a175665646 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -80,9 +80,7 @@ public class KStreamKTableJoinIntegrationTest { userRegionsTopic = "user-regions-" + testNo; userRegionsStoreName = "user-regions-store-name-" + testNo; outputTopic = "output-topic-" + testNo; - CLUSTER.createTopic(userClicksTopic); - CLUSTER.createTopic(userRegionsTopic); - CLUSTER.createTopic(outputTopic); + CLUSTER.createTopics(userClicksTopic, userRegionsTopic, outputTopic); streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java index cff5f43460b..a868839e10d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java @@ -104,29 +104,30 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { @BeforeClass public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(TOPIC_1_0); - CLUSTER.createTopic(TOPIC_2_0); - CLUSTER.createTopic(TOPIC_A_0); - CLUSTER.createTopic(TOPIC_C_0); - CLUSTER.createTopic(TOPIC_Y_0); - CLUSTER.createTopic(TOPIC_Z_0); - CLUSTER.createTopic(TOPIC_1_1); - CLUSTER.createTopic(TOPIC_2_1); - CLUSTER.createTopic(TOPIC_A_1); - CLUSTER.createTopic(TOPIC_C_1); - CLUSTER.createTopic(TOPIC_Y_1); - CLUSTER.createTopic(TOPIC_Z_1); - CLUSTER.createTopic(TOPIC_1_2); - CLUSTER.createTopic(TOPIC_2_2); - CLUSTER.createTopic(TOPIC_A_2); - CLUSTER.createTopic(TOPIC_C_2); - CLUSTER.createTopic(TOPIC_Y_2); - CLUSTER.createTopic(TOPIC_Z_2); - CLUSTER.createTopic(NOOP); - CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); - CLUSTER.createTopic(OUTPUT_TOPIC_0); - CLUSTER.createTopic(OUTPUT_TOPIC_1); - CLUSTER.createTopic(OUTPUT_TOPIC_2); + CLUSTER.createTopics( + TOPIC_1_0, + TOPIC_2_0, + TOPIC_A_0, + TOPIC_C_0, + TOPIC_Y_0, + TOPIC_Z_0, + TOPIC_1_1, + TOPIC_2_1, + TOPIC_A_1, + TOPIC_C_1, + TOPIC_Y_1, + TOPIC_Z_1, + TOPIC_1_2, + TOPIC_2_2, + TOPIC_A_2, + TOPIC_C_2, + TOPIC_Y_2, + TOPIC_Z_2, + NOOP, + DEFAULT_OUTPUT_TOPIC, + OUTPUT_TOPIC_0, + OUTPUT_TOPIC_1, + OUTPUT_TOPIC_2); } @Before diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index c77cf3b08be..4426a5e3af4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -71,10 +71,7 @@ public class KTableKTableJoinIntegrationTest { @BeforeClass public static void beforeTest() throws Exception { - CLUSTER.createTopic(TABLE_1); - CLUSTER.createTopic(TABLE_2); - CLUSTER.createTopic(TABLE_3); - CLUSTER.createTopic(OUTPUT); + CLUSTER.createTopics(TABLE_1, TABLE_2, TABLE_3, OUTPUT); streamsConfig = new Properties(); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index f2d0427550f..509a7fdb92a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -115,13 +115,10 @@ public class QueryableStateIntegrationTest { outputTopicConcurrent = outputTopicConcurrent + "-" + testNo; outputTopicThree = outputTopicThree + "-" + testNo; streamTwo = streamTwo + "-" + testNo; - CLUSTER.createTopic(streamOne); - CLUSTER.createTopic(streamConcurrent); + CLUSTER.createTopics(streamOne, streamConcurrent); CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS); CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1); - CLUSTER.createTopic(outputTopic); - CLUSTER.createTopic(outputTopicConcurrent); - CLUSTER.createTopic(outputTopicThree); + CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicThree); } @Before diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 011bca6b789..0b5c5e94430 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -97,18 +97,18 @@ public class RegexSourceIntegrationTest { @BeforeClass public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(TOPIC_1); - CLUSTER.createTopic(TOPIC_2); - CLUSTER.createTopic(TOPIC_A); - CLUSTER.createTopic(TOPIC_C); - CLUSTER.createTopic(TOPIC_Y); - CLUSTER.createTopic(TOPIC_Z); - CLUSTER.createTopic(FA_TOPIC); - CLUSTER.createTopic(FOO_TOPIC); + CLUSTER.createTopics( + TOPIC_1, + TOPIC_2, + TOPIC_A, + TOPIC_C, + TOPIC_Y, + TOPIC_Z, + FA_TOPIC, + FOO_TOPIC, + DEFAULT_OUTPUT_TOPIC); CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1); CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1); - CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); - } @Before @@ -191,8 +191,7 @@ public class RegexSourceIntegrationTest { final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); - CLUSTER.createTopic("TEST-TOPIC-A"); - CLUSTER.createTopic("TEST-TOPIC-B"); + CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B"); final KStreamBuilder builder = new KStreamBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 626f38d73de..31a146547f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -23,7 +23,6 @@ import kafka.utils.MockTime; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -194,25 +193,7 @@ public class ResetIntegrationTest { "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); cleanGlobal(INTERMEDIATE_USER_TOPIC); - CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC); - Set allTopics; - ZkUtils zkUtils = null; - try { - zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()); - - do { - Utils.sleep(100); - allTopics = new HashSet<>(); - allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); - } while (allTopics.contains(INTERMEDIATE_USER_TOPIC)); - } finally { - if (zkUtils != null) { - zkUtils.close(); - } - } + CLUSTER.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); } @Test @@ -281,33 +262,7 @@ public class ResetIntegrationTest { } private void prepareInputData() throws Exception { - try { - CLUSTER.deleteTopic(INPUT_TOPIC); - } catch (final UnknownTopicOrPartitionException e) { - // ignore - } - try { - CLUSTER.deleteTopic(OUTPUT_TOPIC); - } catch (final UnknownTopicOrPartitionException e) { - // ignore - } - try { - CLUSTER.deleteTopic(OUTPUT_TOPIC_2); - } catch (final UnknownTopicOrPartitionException e) { - // ignore - } - try { - CLUSTER.deleteTopic(OUTPUT_TOPIC_2_RERUN); - } catch (final UnknownTopicOrPartitionException e) { - // ignore - } - - waitUntilUserTopicsAreDeleted(); - - CLUSTER.createTopic(INPUT_TOPIC); - CLUSTER.createTopic(OUTPUT_TOPIC); - CLUSTER.createTopic(OUTPUT_TOPIC_2); - CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN); + CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class); @@ -406,34 +361,6 @@ public class ResetIntegrationTest { Assert.assertEquals(0, exitCode); } - private void waitUntilUserTopicsAreDeleted() { - ZkUtils zkUtils = null; - try { - zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()); - - while (userTopicExists(new HashSet<>(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())))) { - Utils.sleep(100); - } - } finally { - if (zkUtils != null) { - zkUtils.close(); - } - } - } - - private boolean userTopicExists(final Set allTopics) { - final Set expectedMissingTopics = new HashSet<>(); - expectedMissingTopics.add(INPUT_TOPIC); - expectedMissingTopics.add(OUTPUT_TOPIC); - expectedMissingTopics.add(OUTPUT_TOPIC_2); - expectedMissingTopics.add(OUTPUT_TOPIC_2_RERUN); - - return expectedMissingTopics.removeAll(allTopics); - } - private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) { final Set expectedRemainingTopicsAfterCleanup = new HashSet<>(); expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 6a0fc517fb2..e738bc698af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -19,16 +19,24 @@ package org.apache.kafka.streams.integration.utils; import kafka.server.KafkaConfig$; import kafka.server.KafkaServer; import kafka.utils.MockTime; +import kafka.utils.ZkUtils; import kafka.zk.EmbeddedZookeeper; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Set; /** * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. @@ -38,8 +46,11 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected private static final int TOPIC_CREATION_TIMEOUT = 30000; + private static final int TOPIC_DELETION_TIMEOUT = 30000; private EmbeddedZookeeper zookeeper = null; private final KafkaEmbedded[] brokers; + private ZkUtils zkUtils = null; + private final Properties brokerConfig; public final MockTime time; @@ -77,6 +88,12 @@ public class EmbeddedKafkaCluster extends ExternalResource { zookeeper = new EmbeddedZookeeper(); log.debug("ZooKeeper instance is running at {}", zKConnectString()); + zkUtils = ZkUtils.apply( + zKConnectString(), + 30000, + 30000, + JaasUtils.isZkSecurityEnabled()); + brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT); putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); @@ -109,6 +126,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { for (final KafkaEmbedded broker : brokers) { broker.stop(); } + zkUtils.close(); zookeeper.shutdown(); } @@ -141,6 +159,17 @@ public class EmbeddedKafkaCluster extends ExternalResource { stop(); } + /** + * Create multiple Kafka topics each with 1 partition and a replication factor of 1. + * + * @param topics The name of the topics. + */ + public void createTopics(final String... topics) throws InterruptedException { + for (final String topic : topics) { + createTopic(topic, 1, 1, new Properties()); + } + } + /** * Create a Kafka topic with 1 partition and a replication factor of 1. * @@ -181,8 +210,93 @@ public class EmbeddedKafkaCluster extends ExternalResource { IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT); } - public void deleteTopic(final String topic) { - brokers[0].deleteTopic(topic); + /** + * Deletes a topic returns immediately. + * + * @param topic the name of the topic + */ + public void deleteTopic(final String topic) throws Exception { + deleteTopicsAndWait(-1L, topic); + } + + /** + * Deletes a topic and blocks for max 30 sec until the topic got deleted. + * + * @param topic the name of the topic + */ + public void deleteTopicAndWait(final String topic) throws Exception { + deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic); + } + + /** + * Deletes a topic and blocks until the topic got deleted. + * + * @param timeoutMs the max time to wait for the topic to be deleted (does not block if {@code <= 0}) + * @param topic the name of the topic + */ + public void deleteTopicAndWait(final long timeoutMs, final String topic) throws Exception { + deleteTopicsAndWait(timeoutMs, topic); + } + + /** + * Deletes multiple topics returns immediately. + * + * @param topics the name of the topics + */ + public void deleteTopics(final String... topics) throws Exception { + deleteTopicsAndWait(-1, topics); + } + + /** + * Deletes multiple topics and blocks for max 30 sec until all topics got deleted. + * + * @param topics the name of the topics + */ + public void deleteTopicsAndWait(final String... topics) throws Exception { + deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics); + } + + /** + * Deletes multiple topics and blocks until all topics got deleted. + * + * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0}) + * @param topics the name of the topics + */ + public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws Exception { + for (final String topic : topics) { + try { + brokers[0].deleteTopic(topic); + } catch (final UnknownTopicOrPartitionException e) { } + } + + if (timeoutMs > 0) { + TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds."); + } + } + + public void deleteAndRecreateTopics(final String... topics) throws Exception { + deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics); + createTopics(topics); + } + + public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws Exception { + deleteTopicsAndWait(timeoutMs, topics); + createTopics(topics); + } + + private final class TopicsDeletedCondition implements TestCondition { + final Set deletedTopic = new HashSet<>(); + + private TopicsDeletedCondition(final String... topics) { + Collections.addAll(deletedTopic, topics); + } + + @Override + public boolean conditionMet() { + final Set allTopics = new HashSet<>(); + allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); + return !allTopics.removeAll(deletedTopic); + } } public List brokers() {