Browse Source

HOTFIX: turn off auto topic creation in embedded kafka cluster

Turning off auto topic creation in the EmbeddedKafkaCluster used by Streams as it can cause race conditions that lead to build hangs.
Fixed the couple of tests that needed to have some topics manually created

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska <eno.thereska@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1941 from dguy/disable-auto-topic-create
pull/1941/merge
Damian Guy 8 years ago committed by Ismael Juma
parent
commit
43c30b8c39
  1. 3
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
  2. 1
      streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
  3. 1
      streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

3
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java

@ -222,6 +222,7 @@ public class KStreamRepartitionJoinTest { @@ -222,6 +222,7 @@ public class KStreamRepartitionJoinTest {
};
final String output = "join-rhs-stream-mapped-" + testNo;
CLUSTER.createTopic(output);
streamTwo
.join(streamOne.map(keyMapper),
joiner,
@ -241,6 +242,7 @@ public class KStreamRepartitionJoinTest { @@ -241,6 +242,7 @@ public class KStreamRepartitionJoinTest {
final String outputTopic = "left-join-" + testNo;
CLUSTER.createTopic(outputTopic);
map1.leftJoin(map2,
valueJoiner,
getJoinWindow(),
@ -275,6 +277,7 @@ public class KStreamRepartitionJoinTest { @@ -275,6 +277,7 @@ public class KStreamRepartitionJoinTest {
};
final String topic = "map-join-join-" + testNo;
CLUSTER.createTopic(topic);
join.map(kvMapper)
.join(streamFour.map(kvMapper),
joiner,

1
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java

@ -101,6 +101,7 @@ public class RegexSourceIntegrationTest { @@ -101,6 +101,7 @@ public class RegexSourceIntegrationTest {
CLUSTER.createTopic(FOO_TOPIC);
CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
}

1
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

@ -58,6 +58,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { @@ -58,6 +58,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
brokerConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);

Loading…
Cancel
Save