Browse Source

KAFKA-6126: Remove unnecessary topics created check

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4322 from mjsax/kafka-6126-remove-topic-check-on-rebalance-2
pull/4348/merge
Matthias J. Sax 7 years ago committed by Guozhang Wang
parent
commit
dca1474b4b
  1. 27
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
  2. 21
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
  3. 40
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
  4. 2
      streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java

27
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java

@ -87,7 +87,7 @@ public class InternalTopicManager { @@ -87,7 +87,7 @@ public class InternalTopicManager {
* If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
*/
public void makeReady(final Map<String, InternalTopicConfig> topics) {
final Map<String, Integer> existingTopicPartitions = getNumPartitions(topics.keySet(), true);
final Map<String, Integer> existingTopicPartitions = getNumPartitions(topics.keySet());
final Set<InternalTopicConfig> topicsToBeCreated = validateTopicPartitions(topics.values(), existingTopicPartitions);
if (topicsToBeCreated.size() > 0) {
final Set<NewTopic> newTopics = new HashSet<>();
@ -169,12 +169,8 @@ public class InternalTopicManager { @@ -169,12 +169,8 @@ public class InternalTopicManager {
/**
* Get the number of partitions for the given topics
*/
public Map<String, Integer> getNumPartitions(final Set<String> topics) {
return getNumPartitions(topics, false);
}
private Map<String, Integer> getNumPartitions(final Set<String> topics,
final boolean bestEffort) {
// visible for testing
protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
int remainingRetries = retries;
boolean retry;
do {
@ -202,12 +198,7 @@ public class InternalTopicManager { @@ -202,12 +198,7 @@ public class InternalTopicManager {
"Will try again (remaining retries {}).", topicFuture.getKey(), remainingRetries - 1);
} else {
final String error = "Could not get number of partitions for topic {}.";
if (bestEffort) {
log.debug(error, topicFuture.getKey(), cause.getMessage());
} else {
log.error(error, topicFuture.getKey(), cause);
throw new StreamsException(cause);
}
log.debug(error, topicFuture.getKey(), cause.getMessage());
}
}
}
@ -220,15 +211,7 @@ public class InternalTopicManager { @@ -220,15 +211,7 @@ public class InternalTopicManager {
return existingNumberOfPartitionsPerTopic;
} while (remainingRetries-- > 0);
if (bestEffort) {
return Collections.emptyMap();
}
final String timeoutAndRetryError = "Could not get number of partitions from brokers. " +
"This can happen if the Kafka cluster is temporary not available. " +
"You can increase admin client config `retries` to be resilient against this error.";
log.error(timeoutAndRetryError);
throw new StreamsException(timeoutAndRetryError);
return Collections.emptyMap();
}
/**

21
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

@ -631,32 +631,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable @@ -631,32 +631,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
if (!topicsToMakeReady.isEmpty()) {
internalTopicManager.makeReady(topicsToMakeReady);
// wait until each one of the topic metadata has been propagated to at least one broker
while (!allTopicsCreated(topicsToMakeReady)) {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
}
}
log.debug("Completed validating internal topics in partition assignor.");
}
private boolean allTopicsCreated(final Map<String, InternalTopicConfig> topicsToMakeReady) {
final Map<String, Integer> partitions = internalTopicManager.getNumPartitions(topicsToMakeReady.keySet());
for (final InternalTopicConfig topic : topicsToMakeReady.values()) {
final Integer numPartitions = partitions.get(topic.name());
if (numPartitions == null || !numPartitions.equals(topic.numberOfPartitions())) {
return false;
}
}
return true;
}
private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
Cluster metadata) {

40
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java

@ -24,7 +24,6 @@ import org.apache.kafka.common.Node; @@ -24,7 +24,6 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@ -32,17 +31,14 @@ import org.junit.After; @@ -32,17 +31,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class InternalTopicManagerTest {
@ -82,7 +78,7 @@ public class InternalTopicManagerTest { @@ -82,7 +78,7 @@ public class InternalTopicManagerTest {
}
@After
public void shutdown() throws IOException {
public void shutdown() {
mockAdminClient.close();
}
@ -96,40 +92,6 @@ public class InternalTopicManagerTest { @@ -96,40 +92,6 @@ public class InternalTopicManagerTest {
assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
}
@Test
public void shouldFailWithUnknownTopicException() {
mockAdminClient.addTopic(
false,
topic,
Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
null);
try {
internalTopicManager.getNumPartitions(new HashSet<String>() {
{
add(topic);
add(topic2);
}
});
fail("Should have thrown UnknownTopicOrPartitionException.");
} catch (final StreamsException expected) {
assertTrue(expected.getCause() instanceof UnknownTopicOrPartitionException);
}
}
@Test
public void shouldExhaustRetriesOnTimeoutExceptionForGetNumPartitions() {
mockAdminClient.timeoutNextRequest(2);
try {
internalTopicManager.getNumPartitions(Collections.singleton(topic));
fail("Should have thrown StreamsException.");
} catch (final StreamsException expected) {
assertNull(expected.getCause());
assertEquals("Could not get number of partitions from brokers. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.", expected.getMessage());
}
}
@Test
public void shouldCreateRequiredTopics() throws Exception {
final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic, Collections.<String, String>emptyMap());

2
streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java

@ -60,7 +60,7 @@ public class MockInternalTopicManager extends InternalTopicManager { @@ -60,7 +60,7 @@ public class MockInternalTopicManager extends InternalTopicManager {
}
@Override
public Map<String, Integer> getNumPartitions(final Set<String> topics) {
protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
final Map<String, Integer> partitions = new HashMap<>();
for (String topic : topics) {
partitions.put(topic, restoreConsumer.partitionsFor(topic) == null ? null : restoreConsumer.partitionsFor(topic).size());

Loading…
Cancel
Save