diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java index 85c0a905240..eac086a34c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java @@ -35,7 +35,7 @@ public class CreateTopicsResult { private final Map> futures; - CreateTopicsResult(Map> futures) { + protected CreateTopicsResult(Map> futures) { this.futures = futures; } @@ -94,7 +94,7 @@ public class CreateTopicsResult { return futures.get(topic).thenApply(TopicMetadataAndConfig::replicationFactor); } - static class TopicMetadataAndConfig { + public static class TopicMetadataAndConfig { private final ApiException exception; private final int numPartitions; private final int replicationFactor; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java index ffb6470e503..7753984a7bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java @@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException; public class DescribeTopicsResult { private final Map> futures; - DescribeTopicsResult(Map> futures) { + protected DescribeTopicsResult(Map> futures) { this.futures = futures; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 52332f88206..281a8c66851 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1037,7 +1037,7 @@ public class KafkaAdminClient extends AdminClient { } ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, requestTimeoutMs, null); - log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId()); + log.debug("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId()); client.send(clientRequest, now); getOrCreateListValue(callsInFlight, node.idString()).add(call); correlationIdToCalls.put(clientRequest.correlationId(), call); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index ab214931ede..e6fb9263dd5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -60,8 +60,7 @@ public class InternalTopicManager { private final int retries; private final long retryBackOffMs; - public InternalTopicManager(final Admin adminClient, - final StreamsConfig streamsConfig) { + public InternalTopicManager(final Admin adminClient, final StreamsConfig streamsConfig) { this.adminClient = adminClient; final LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())); @@ -202,7 +201,7 @@ public class InternalTopicManager { if (cause instanceof UnknownTopicOrPartitionException || cause instanceof LeaderNotAvailableException) { // This topic didn't exist or leader is not known yet, proceed to try to create it - log.debug("Topic {} is unknown or not found, hence not existed yet.", topicName); + log.debug("Topic {} is unknown or not found, hence not existed yet: {}", topicName, cause.toString()); } else { log.error("Unexpected error during topic description for {}.\n" + "Error message was: {}", topicName, cause.toString()); @@ -217,15 +216,17 @@ public class InternalTopicManager { /** * Check the existing topics to have correct number of partitions; and return the remaining topics that needs to be created */ - private Set validateTopics(final Set topicsToValidate, - final Map topicsMap) { + private Set validateTopics(final Set topicsToValidate, final Map topicsMap) { + if (!topicsMap.keySet().containsAll(topicsToValidate)) { + throw new IllegalStateException("The topics map " + topicsMap.keySet() + " does not contain all the topics " + + topicsToValidate + " trying to validate."); + } final Map existedTopicPartition = getNumPartitions(topicsToValidate); final Set topicsToCreate = new HashSet<>(); - for (final Map.Entry entry : topicsMap.entrySet()) { - final String topicName = entry.getKey(); - final Optional numberOfPartitions = entry.getValue().numberOfPartitions(); + for (final String topicName : topicsToValidate) { + final Optional numberOfPartitions = topicsMap.get(topicName).numberOfPartitions(); if (existedTopicPartition.containsKey(topicName) && numberOfPartitions.isPresent()) { if (!existedTopicPartition.get(topicName).equals(numberOfPartitions.get())) { final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " + diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 074228a6c14..579e45897d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -16,19 +16,28 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,6 +47,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -90,18 +100,18 @@ public class InternalTopicManagerTest { mockAdminClient.addTopic( false, topic, - Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())), + Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())), null); assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic))); } @Test public void shouldCreateRequiredTopics() throws Exception { - final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); + final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); topicConfig.setNumberOfPartitions(1); - final InternalTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig(topic2, Collections.emptyMap()); + final InternalTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig(topic2, Collections.emptyMap()); topicConfig2.setNumberOfPartitions(1); - final InternalTopicConfig topicConfig3 = new WindowedChangelogTopicConfig(topic3, Collections.emptyMap()); + final InternalTopicConfig topicConfig3 = new WindowedChangelogTopicConfig(topic3, Collections.emptyMap()); topicConfig3.setNumberOfPartitions(1); internalTopicManager.makeReady(Collections.singletonMap(topic, topicConfig)); @@ -111,17 +121,17 @@ public class InternalTopicManagerTest { assertEquals(Utils.mkSet(topic, topic2, topic3), mockAdminClient.listTopics().names().get()); assertEquals(new TopicDescription(topic, false, new ArrayList() { { - add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); + add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); } }), mockAdminClient.describeTopics(Collections.singleton(topic)).values().get(topic).get()); assertEquals(new TopicDescription(topic2, false, new ArrayList() { { - add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); + add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); } }), mockAdminClient.describeTopics(Collections.singleton(topic2)).values().get(topic2).get()); assertEquals(new TopicDescription(topic3, false, new ArrayList() { { - add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); + add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); } }), mockAdminClient.describeTopics(Collections.singleton(topic3)).values().get(topic3).get()); @@ -132,7 +142,49 @@ public class InternalTopicManagerTest { assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), mockAdminClient.describeConfigs(Collections.singleton(resource)).values().get(resource).get().get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT), mockAdminClient.describeConfigs(Collections.singleton(resource2)).values().get(resource2).get().get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE), mockAdminClient.describeConfigs(Collections.singleton(resource3)).values().get(resource3).get().get(TopicConfig.CLEANUP_POLICY_CONFIG)); + } + @Test + public void shouldCompleteTopicValidationOnRetry() { + final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); + final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1, + Collections.singletonList(broker1), Collections.singletonList(broker1)); + + final KafkaFutureImpl topicDescriptionSuccessFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl<>(); + topicDescriptionSuccessFuture.complete(new TopicDescription(topic, false, Collections.singletonList(partitionInfo), Collections.emptySet())); + topicDescriptionFailFuture.completeExceptionally(new UnknownTopicOrPartitionException("KABOOM!")); + + final KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl<>(); + topicCreationFuture.completeExceptionally(new TopicExistsException("KABOOM!")); + + // let the first describe succeed on topic, and fail on topic2, and then let creation throws topics-existed; + // it should retry with just topic2 and then let it succeed + EasyMock.expect(admin.describeTopics(Utils.mkSet(topic, topic2))) + .andReturn(new MockDescribeTopicsResult(Utils.mkMap( + Utils.mkEntry(topic, topicDescriptionSuccessFuture), + Utils.mkEntry(topic2, topicDescriptionFailFuture) + ))).once(); + EasyMock.expect(admin.createTopics(Collections.singleton(new NewTopic(topic2, Optional.of(1), Optional.of((short) 1)) + .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT), + Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime")))))) + .andReturn(new MockCreateTopicsResult(Collections.singletonMap(topic2, topicCreationFuture))).once(); + EasyMock.expect(admin.describeTopics(Collections.singleton(topic2))) + .andReturn(new MockDescribeTopicsResult(Collections.singletonMap(topic2, topicDescriptionSuccessFuture))); + + EasyMock.replay(admin); + + final InternalTopicConfig topicConfig = new UnwindowedChangelogTopicConfig(topic, Collections.emptyMap()); + topicConfig.setNumberOfPartitions(1); + final InternalTopicConfig topic2Config = new UnwindowedChangelogTopicConfig(topic2, Collections.emptyMap()); + topic2Config.setNumberOfPartitions(1); + topicManager.makeReady(Utils.mkMap( + Utils.mkEntry(topic, topicConfig), + Utils.mkEntry(topic2, topic2Config) + )); + + EasyMock.verify(admin); } @Test @@ -142,14 +194,14 @@ public class InternalTopicManagerTest { topic, new ArrayList() { { - add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); - add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.emptyList())); + add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())); + add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.emptyList())); } }, null); try { - final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); + final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); internalTopicConfig.setNumberOfPartitions(1); internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig)); fail("Should have thrown StreamsException"); @@ -161,7 +213,7 @@ public class InternalTopicManagerTest { mockAdminClient.addTopic( false, topic, - Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), + Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), null); // attempt to create it again with replication 1 @@ -213,11 +265,10 @@ public class InternalTopicManagerTest { topicConfigMap.put(topic, internalTopicConfig); topicConfigMap.put("internal-topic", internalTopicConfigII); - internalTopicManager.makeReady(topicConfigMap); boolean foundExpectedMessage = false; for (final String message : appender.getMessages()) { - foundExpectedMessage |= message.contains("Topic internal-topic is unknown or not found, hence not existed yet."); + foundExpectedMessage |= message.contains("Topic internal-topic is unknown or not found, hence not existed yet"); } assertTrue(foundExpectedMessage); @@ -243,4 +294,17 @@ public class InternalTopicManagerTest { } } + private class MockCreateTopicsResult extends CreateTopicsResult { + + MockCreateTopicsResult(final Map> futures) { + super(futures); + } + } + + private class MockDescribeTopicsResult extends DescribeTopicsResult { + + MockDescribeTopicsResult(final Map> futures) { + super(futures); + } + } }