Browse Source

KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race between when the topic is actually created/ready on the broker and when the assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a `Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait until the topics have been created.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2371 from dguy/integration-test-fix

(cherry picked from commit 825f225bc5)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
pull/2423/head
Damian Guy 8 years ago committed by Guozhang Wang
parent
commit
c9b9acf6a8
  1. 1
      checkstyle/import-control.xml
  2. 4
      streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
  3. 4
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
  4. 4
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
  5. 2
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
  6. 8
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
  7. 14
      streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
  8. 24
      streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
  9. 43
      streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
  10. 3
      streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
  11. 1
      streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java

1
checkstyle/import-control.xml

@ -156,6 +156,7 @@ @@ -156,6 +156,7 @@
<subpackage name="integration">
<allow pkg="kafka.admin" />
<allow pkg="kafka.api" />
<allow pkg="kafka.server" />
<allow pkg="kafka.tools" />
<allow pkg="kafka.utils" />

4
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java

@ -81,7 +81,7 @@ public class GlobalKTableIntegrationTest { @@ -81,7 +81,7 @@ public class GlobalKTableIntegrationTest {
private ForeachAction<String, String> foreachAction;
@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
builder = new KStreamBuilder();
createTopics();
@ -212,7 +212,7 @@ public class GlobalKTableIntegrationTest { @@ -212,7 +212,7 @@ public class GlobalKTableIntegrationTest {
}
private void createTopics() {
private void createTopics() throws InterruptedException {
inputStream = "input-stream-" + testNo;
inputTable = "input-table-" + testNo;
globalOne = "globalOne-" + testNo;

4
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java

@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest { @@ -72,7 +72,7 @@ public class KStreamAggregationDedupIntegrationTest {
@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
builder = new KStreamBuilder();
createTopics();
@ -267,7 +267,7 @@ public class KStreamAggregationDedupIntegrationTest { @@ -267,7 +267,7 @@ public class KStreamAggregationDedupIntegrationTest {
}
private void createTopics() {
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testNo;
outputTopic = "output-" + testNo;
CLUSTER.createTopic(streamOneInput, 3, 1);

4
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

@ -89,7 +89,7 @@ public class KStreamAggregationIntegrationTest { @@ -89,7 +89,7 @@ public class KStreamAggregationIntegrationTest {
private KStream<Integer, String> stream;
@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
builder = new KStreamBuilder();
createTopics();
@ -637,7 +637,7 @@ public class KStreamAggregationIntegrationTest { @@ -637,7 +637,7 @@ public class KStreamAggregationIntegrationTest {
}
private void createTopics() {
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testNo;
outputTopic = "output-" + testNo;
userSessionsStream = userSessionsStream + "-" + testNo;

2
streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java

@ -74,7 +74,7 @@ public class KStreamKTableJoinIntegrationTest { @@ -74,7 +74,7 @@ public class KStreamKTableJoinIntegrationTest {
private Properties streamsConfiguration;
@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
userClicksTopic = "user-clicks-" + testNo;
userRegionsTopic = "user-regions-" + testNo;

8
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java

@ -93,7 +93,7 @@ public class KStreamRepartitionJoinTest { @@ -93,7 +93,7 @@ public class KStreamRepartitionJoinTest {
}
@Before
public void before() {
public void before() throws InterruptedException {
testNo++;
String applicationId = "kstream-repartition-join-test-" + testNo;
builder = new KStreamBuilder();
@ -146,7 +146,7 @@ public class KStreamRepartitionJoinTest { @@ -146,7 +146,7 @@ public class KStreamRepartitionJoinTest {
verifyLeftJoin(leftJoin);
}
private ExpectedOutputOnTopic mapStreamOneAndJoin() {
private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException {
String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo;
doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
@ -350,7 +350,7 @@ public class KStreamRepartitionJoinTest { @@ -350,7 +350,7 @@ public class KStreamRepartitionJoinTest {
mockTime);
}
private void createTopics() {
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testNo;
streamTwoInput = "stream-two-" + testNo;
streamFourInput = "stream-four-" + testNo;
@ -395,7 +395,7 @@ public class KStreamRepartitionJoinTest { @@ -395,7 +395,7 @@ public class KStreamRepartitionJoinTest {
private void doJoin(final KStream<Integer, Integer> lhs,
final KStream<Integer, String> rhs,
final String outputTopic) {
final String outputTopic) throws InterruptedException {
CLUSTER.createTopic(outputTopic);
lhs.join(rhs,
TOSTRING_JOINER,

14
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java

@ -81,6 +81,7 @@ public class QueryableStateIntegrationTest { @@ -81,6 +81,7 @@ public class QueryableStateIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
public static final int STREAM_THREE_PARTITIONS = 4;
private final MockTime mockTime = CLUSTER.time;
private String streamOne = "stream-one";
private String streamTwo = "stream-two";
@ -91,7 +92,7 @@ public class QueryableStateIntegrationTest { @@ -91,7 +92,7 @@ public class QueryableStateIntegrationTest {
private String outputTopicThree = "output-three";
// sufficiently large window size such that everything falls into 1 window
private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
private static final int NUM_PARTITIONS = 2;
private static final int STREAM_TWO_PARTITIONS = 2;
private static final int NUM_REPLICAS = NUM_BROKERS;
private Properties streamsConfiguration;
private List<String> inputValues;
@ -101,7 +102,7 @@ public class QueryableStateIntegrationTest { @@ -101,7 +102,7 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, Long>> stringLongComparator;
private static int testNo = 0;
public void createTopics() {
public void createTopics() throws InterruptedException {
streamOne = streamOne + "-" + testNo;
streamConcurrent = streamConcurrent + "-" + testNo;
streamThree = streamThree + "-" + testNo;
@ -111,8 +112,8 @@ public class QueryableStateIntegrationTest { @@ -111,8 +112,8 @@ public class QueryableStateIntegrationTest {
streamTwo = streamTwo + "-" + testNo;
CLUSTER.createTopic(streamOne);
CLUSTER.createTopic(streamConcurrent);
CLUSTER.createTopic(streamTwo, NUM_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, 4, 1);
CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
CLUSTER.createTopic(outputTopic);
CLUSTER.createTopic(outputTopicConcurrent);
CLUSTER.createTopic(outputTopicThree);
@ -128,7 +129,7 @@ public class QueryableStateIntegrationTest { @@ -128,7 +129,7 @@ public class QueryableStateIntegrationTest {
}
@Before
public void before() throws IOException {
public void before() throws IOException, InterruptedException {
testNo++;
createTopics();
streamsConfiguration = new Properties();
@ -145,7 +146,6 @@ public class QueryableStateIntegrationTest { @@ -145,7 +146,6 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
stringComparator = new Comparator<KeyValue<String, String>>() {
@Override
@ -328,7 +328,7 @@ public class QueryableStateIntegrationTest { @@ -328,7 +328,7 @@ public class QueryableStateIntegrationTest {
@Test
public void queryOnRebalance() throws Exception {
final int numThreads = NUM_PARTITIONS;
final int numThreads = STREAM_TWO_PARTITIONS;
final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
final Thread[] streamThreads = new Thread[numThreads];
final int numIterations = 500000;

24
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

@ -18,13 +18,17 @@ @@ -18,13 +18,17 @@
package org.apache.kafka.streams.integration.utils;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.TopicPartition;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
@ -34,6 +38,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { @@ -34,6 +38,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
public static final int TOPIC_CREATION_TIMEOUT = 30000;
private EmbeddedZookeeper zookeeper = null;
private final KafkaEmbedded[] brokers;
private final Properties brokerConfig;
@ -122,7 +127,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { @@ -122,7 +127,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
*
* @param topic The name of the topic.
*/
public void createTopic(final String topic) {
public void createTopic(final String topic) throws InterruptedException {
createTopic(topic, 1, 1, new Properties());
}
@ -133,7 +138,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { @@ -133,7 +138,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (the partitions of) this topic.
*/
public void createTopic(final String topic, final int partitions, final int replication) {
public void createTopic(final String topic, final int partitions, final int replication) throws InterruptedException {
createTopic(topic, partitions, replication, new Properties());
}
@ -148,11 +153,24 @@ public class EmbeddedKafkaCluster extends ExternalResource { @@ -148,11 +153,24 @@ public class EmbeddedKafkaCluster extends ExternalResource {
public void createTopic(final String topic,
final int partitions,
final int replication,
final Properties topicConfig) {
final Properties topicConfig) throws InterruptedException {
brokers[0].createTopic(topic, partitions, replication, topicConfig);
final List<TopicPartition> topicPartitions = new ArrayList<>();
for (int partition = 0; partition < partitions; partition++) {
topicPartitions.add(new TopicPartition(topic, partition));
}
IntegrationTestUtils.waitForTopicPartitions(brokers(), topicPartitions, TOPIC_CREATION_TIMEOUT);
}
public void deleteTopic(final String topic) {
brokers[0].deleteTopic(topic);
}
public List<KafkaServer> brokers() {
final List<KafkaServer> servers = new ArrayList<>();
for (final KafkaEmbedded broker : brokers) {
servers.add(broker.kafkaServer());
}
return servers;
}
}

43
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

@ -17,6 +17,10 @@ @@ -17,6 +17,10 @@
package org.apache.kafka.streams.integration.utils;
import kafka.api.PartitionStateInfo;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -24,12 +28,14 @@ import org.apache.kafka.clients.producer.KafkaProducer; @@ -24,12 +28,14 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import scala.Option;
import java.io.File;
import java.io.IOException;
@ -255,4 +261,41 @@ public class IntegrationTestUtils { @@ -255,4 +261,41 @@ public class IntegrationTestUtils {
return accumData;
}
public static void waitForTopicPartitions(final List<KafkaServer> servers,
final List<TopicPartition> partitions,
final long timeout) throws InterruptedException {
final long end = System.currentTimeMillis() + timeout;
for (final TopicPartition partition : partitions) {
final long remaining = end - System.currentTimeMillis();
if (remaining <= 0) {
throw new AssertionError("timed out while waiting for partitions to become available. Timeout=" + timeout);
}
waitUntilMetadataIsPropagated(servers, partition.topic(), partition.partition(), remaining);
}
}
public static void waitUntilMetadataIsPropagated(final List<KafkaServer> servers,
final String topic,
final int partition,
final long timeout) throws InterruptedException {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
for (final KafkaServer server : servers) {
final MetadataCache metadataCache = server.apis().metadataCache();
final Option<PartitionStateInfo> partitionInfo =
metadataCache.getPartitionInfo(topic, partition);
if (partitionInfo.isEmpty()) {
return false;
}
final PartitionStateInfo partitionStateInfo = partitionInfo.get();
if (!Request.isValidBrokerId(partitionStateInfo.leaderIsrAndControllerEpoch().leaderAndIsr().leader())) {
return false;
}
}
return true;
}
}, timeout, "metatadata for topic=" + topic + " partition=" + partition + " not propogated to all brokers");
}
}

3
streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java

@ -202,4 +202,7 @@ public class KafkaEmbedded { @@ -202,4 +202,7 @@ public class KafkaEmbedded {
zkClient.close();
}
public KafkaServer kafkaServer() {
return kafka;
}
}

1
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java

@ -66,4 +66,5 @@ public class StreamsTestUtils { @@ -66,4 +66,5 @@ public class StreamsTestUtils {
}
return results;
}
}

Loading…
Cancel
Save