diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index f093d8332d4..05d079b0c58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -87,7 +87,7 @@ public class InternalTopicManager { * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again. */ public void makeReady(final Map topics) { - final Map existingTopicPartitions = getNumPartitions(topics.keySet(), true); + final Map existingTopicPartitions = getNumPartitions(topics.keySet()); final Set topicsToBeCreated = validateTopicPartitions(topics.values(), existingTopicPartitions); if (topicsToBeCreated.size() > 0) { final Set newTopics = new HashSet<>(); @@ -169,12 +169,8 @@ public class InternalTopicManager { /** * Get the number of partitions for the given topics */ - public Map getNumPartitions(final Set topics) { - return getNumPartitions(topics, false); - } - - private Map getNumPartitions(final Set topics, - final boolean bestEffort) { + // visible for testing + protected Map getNumPartitions(final Set topics) { int remainingRetries = retries; boolean retry; do { @@ -202,12 +198,7 @@ public class InternalTopicManager { "Will try again (remaining retries {}).", topicFuture.getKey(), remainingRetries - 1); } else { final String error = "Could not get number of partitions for topic {}."; - if (bestEffort) { - log.debug(error, topicFuture.getKey(), cause.getMessage()); - } else { - log.error(error, topicFuture.getKey(), cause); - throw new StreamsException(cause); - } + log.debug(error, topicFuture.getKey(), cause.getMessage()); } } } @@ -220,15 +211,7 @@ public class InternalTopicManager { return existingNumberOfPartitionsPerTopic; } while (remainingRetries-- > 0); - if (bestEffort) { - return Collections.emptyMap(); - } - - final String timeoutAndRetryError = "Could not get number of partitions from brokers. " + - "This can happen if the Kafka cluster is temporary not available. " + - "You can increase admin client config `retries` to be resilient against this error."; - log.error(timeoutAndRetryError); - throw new StreamsException(timeoutAndRetryError); + return Collections.emptyMap(); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index d5a1fe69121..ef9ca3de7dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -631,32 +631,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (!topicsToMakeReady.isEmpty()) { internalTopicManager.makeReady(topicsToMakeReady); - - // wait until each one of the topic metadata has been propagated to at least one broker - while (!allTopicsCreated(topicsToMakeReady)) { - try { - Thread.sleep(50L); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // ignore - } - } } log.debug("Completed validating internal topics in partition assignor."); } - private boolean allTopicsCreated(final Map topicsToMakeReady) { - final Map partitions = internalTopicManager.getNumPartitions(topicsToMakeReady.keySet()); - for (final InternalTopicConfig topic : topicsToMakeReady.values()) { - final Integer numPartitions = partitions.get(topic.name()); - if (numPartitions == null || !numPartitions.equals(topic.numberOfPartitions())) { - return false; - } - } - return true; - } - private void ensureCopartitioning(Collection> copartitionGroups, Map allRepartitionTopicsNumPartitions, Cluster metadata) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 3210b237198..d9189d4bef1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; @@ -32,17 +31,14 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class InternalTopicManagerTest { @@ -82,7 +78,7 @@ public class InternalTopicManagerTest { } @After - public void shutdown() throws IOException { + public void shutdown() { mockAdminClient.close(); } @@ -96,40 +92,6 @@ public class InternalTopicManagerTest { assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic))); } - @Test - public void shouldFailWithUnknownTopicException() { - mockAdminClient.addTopic( - false, - topic, - Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())), - null); - - try { - internalTopicManager.getNumPartitions(new HashSet() { - { - add(topic); - add(topic2); - } - }); - fail("Should have thrown UnknownTopicOrPartitionException."); - } catch (final StreamsException expected) { - assertTrue(expected.getCause() instanceof UnknownTopicOrPartitionException); - } - } - - @Test - public void shouldExhaustRetriesOnTimeoutExceptionForGetNumPartitions() { - mockAdminClient.timeoutNextRequest(2); - - try { - internalTopicManager.getNumPartitions(Collections.singleton(topic)); - fail("Should have thrown StreamsException."); - } catch (final StreamsException expected) { - assertNull(expected.getCause()); - assertEquals("Could not get number of partitions from brokers. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.", expected.getMessage()); - } - } - @Test public void shouldCreateRequiredTopics() throws Exception { final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java index ae73280316a..ea27045b9f5 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java @@ -60,7 +60,7 @@ public class MockInternalTopicManager extends InternalTopicManager { } @Override - public Map getNumPartitions(final Set topics) { + protected Map getNumPartitions(final Set topics) { final Map partitions = new HashMap<>(); for (String topic : topics) { partitions.put(topic, restoreConsumer.partitionsFor(topic) == null ? null : restoreConsumer.partitionsFor(topic).size());