From 4e722021a916a2a4183210dae70e6c95df2b5593 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 14 May 2020 13:53:28 -0700 Subject: [PATCH] MINOR: skip listOffsets request for newly created changelog topics (#8662) A small hotfix to avoid an extra probing rebalance the first time an application is launched. This should particularly improve the testing experience. Reviewer: Matthias J. Sax , John Roesler --- .../processor/internals/ClientUtils.java | 5 + .../internals/InternalTopicManager.java | 15 ++- .../internals/StreamsPartitionAssignor.java | 97 ++++++++----------- .../EosBetaUpgradeIntegrationTest.java | 2 - ...ilabilityStreamsPartitionAssignorTest.java | 5 +- .../StreamsPartitionAssignorTest.java | 41 +++++++- .../kafka/test/MockInternalTopicManager.java | 14 ++- 7 files changed, 108 insertions(+), 71 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java index 216753d3fa3..33aee6486fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java @@ -36,9 +36,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClientUtils { + private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class); + // currently admin client is shared among all threads public static String getSharedAdminClientId(final String clientId) { return clientId + "-admin"; @@ -105,6 +109,7 @@ public class ClientUtils { endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } } catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) { + LOG.warn("listOffsets request failed.", e); throw new StreamsException("Unable to obtain end offsets from kafka", e); } return endOffsets; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index e6fb9263dd5..a05aa489530 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -93,16 +93,20 @@ public class InternalTopicManager { * If a topic does not exist creates a new topic. * If a topic with the correct number of partitions exists ignores it. * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again. + * @return the set of topics which had to be newly created */ - public void makeReady(final Map topics) { + public Set makeReady(final Map topics) { // we will do the validation / topic-creation in a loop, until we have confirmed all topics // have existed with the expected number of partitions, or some create topic returns fatal errors. + log.debug("Starting to validate internal topics {} in partition assignor.", topics); int remainingRetries = retries; Set topicsNotReady = new HashSet<>(topics.keySet()); + final Set newlyCreatedTopics = new HashSet<>(); while (!topicsNotReady.isEmpty() && remainingRetries >= 0) { topicsNotReady = validateTopics(topicsNotReady, topics); + newlyCreatedTopics.addAll(topicsNotReady); if (!topicsNotReady.isEmpty()) { final Set newTopics = new HashSet<>(); @@ -169,6 +173,9 @@ public class InternalTopicManager { log.error(timeoutAndRetryError); throw new StreamsException(timeoutAndRetryError); } + log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); + + return newlyCreatedTopics; } /** @@ -227,7 +234,11 @@ public class InternalTopicManager { final Set topicsToCreate = new HashSet<>(); for (final String topicName : topicsToValidate) { final Optional numberOfPartitions = topicsMap.get(topicName).numberOfPartitions(); - if (existedTopicPartition.containsKey(topicName) && numberOfPartitions.isPresent()) { + if (!numberOfPartitions.isPresent()) { + log.error("Found undefined number of partitions for topic {}", topicName); + throw new StreamsException("Topic " + topicName + " number of partitions not defined"); + } + if (existedTopicPartition.containsKey(topicName)) { if (!existedTopicPartition.get(topicName).equals(numberOfPartitions.get())) { final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " + "expected: %d; actual: %d. " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 67d36ff8cd2..7e207d26633 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -492,7 +492,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf // make sure the repartition source topics exist with the right number of partitions, // create these topics if necessary - prepareTopic(repartitionTopicMetadata); + internalTopicManager.makeReady(repartitionTopicMetadata); // augment the metadata with the newly computed number of partitions for all the // repartition source topics @@ -640,14 +640,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } /** - * Resolve changelog topic metadata and create them if necessary. - * - * @return mapping of stateful tasks to their set of changelog topics + * Resolve changelog topic metadata and create them if necessary. Fills in the changelogsByStatefulTask map + * and returns the set of changelogs which were newly created. */ - private Map> prepareChangelogTopics(final Map topicGroups, - final Map> tasksForTopicGroup) { - final Map> changelogsByStatefulTask = new HashMap<>(); - + private Set prepareChangelogTopics(final Map topicGroups, + final Map> tasksForTopicGroup, + final Map> changelogsByStatefulTask) { // add tasks to state change log topic subscribers final Map changelogTopicMetadata = new HashMap<>(); for (final Map.Entry entry : topicGroups.entrySet()) { @@ -685,9 +683,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } } - prepareTopic(changelogTopicMetadata); + final Set newlyCreatedTopics = internalTopicManager.makeReady(changelogTopicMetadata); log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values()); - return changelogsByStatefulTask; + return newlyCreatedTopics; } /** @@ -703,12 +701,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final Map> tasksForTopicGroup = new HashMap<>(); populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata); - final Map> changelogsByStatefulTask = - prepareChangelogTopics(topicGroups, tasksForTopicGroup); + final Map> changelogsByStatefulTask = new HashMap<>(); + final Set newlyCreatedChangelogs = prepareChangelogTopics(topicGroups, tasksForTopicGroup, changelogsByStatefulTask); final Map clientStates = new HashMap<>(); final boolean lagComputationSuccessful = - populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask); + populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask, newlyCreatedChangelogs); final Set allTasks = partitionsForTask.keySet(); final Set statefulTasks = changelogsByStatefulTask.keySet(); @@ -758,16 +756,26 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf private boolean populateClientStatesMap(final Map clientStates, final Map clientMetadataMap, final Map taskForPartition, - final Map> changelogsByStatefulTask) { + final Map> changelogsByStatefulTask, + final Set newlyCreatedChangelogs) { boolean fetchEndOffsetsSuccessful; Map allTaskEndOffsetSums; try { - final Collection allChangelogPartitions = changelogsByStatefulTask.values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); + final Collection allChangelogPartitions = + changelogsByStatefulTask.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + final Collection allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions); + allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic())); + + final Collection allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions); + allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions); + final Map endOffsets = - fetchEndOffsets(allChangelogPartitions, adminClient, Duration.ofMillis(adminClientTimeout)); - allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask); + fetchEndOffsets(allPreexistingChangelogPartitions, adminClient, Duration.ofMillis(adminClientTimeout)); + + allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions); fetchEndOffsetsSuccessful = true; } catch (final StreamsException e) { allTaskEndOffsetSums = null; @@ -794,7 +802,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf * @return Map from stateful task to its total end offset summed across all changelog partitions */ private Map computeEndOffsetSumsByTask(final Map endOffsets, - final Map> changelogsByStatefulTask) { + final Map> changelogsByStatefulTask, + final Collection newlyCreatedChangelogPartitions) { final Map taskEndOffsetSums = new HashMap<>(); for (final Map.Entry> taskEntry : changelogsByStatefulTask.entrySet()) { final TaskId task = taskEntry.getKey(); @@ -802,12 +811,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf taskEndOffsetSums.put(task, 0L); for (final TopicPartition changelog : changelogs) { - final ListOffsetsResultInfo offsetResult = endOffsets.get(changelog); - if (offsetResult == null) { - log.debug("Fetched end offsets did not contain the changelog {} of task {}", changelog, task); - throw new IllegalStateException("Could not get end offset for " + changelog); + final long changelogEndOffset; + if (newlyCreatedChangelogPartitions.contains(changelog)) { + changelogEndOffset = 0L; + } else { + final ListOffsetsResultInfo offsetResult = endOffsets.get(changelog); + if (offsetResult == null) { + log.debug("Fetched end offsets did not contain the changelog {} of task {}", changelog, task); + throw new IllegalStateException("Could not get end offset for " + changelog); + } + changelogEndOffset = offsetResult.offset(); } - final long newEndOffsetSum = taskEndOffsetSums.get(task) + offsetResult.offset(); + final long newEndOffsetSum = taskEndOffsetSums.get(task) + changelogEndOffset; if (newEndOffsetSum < 0) { taskEndOffsetSums.put(task, Long.MAX_VALUE); break; @@ -1525,38 +1540,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } } - /** - * Internal helper function that creates a Kafka topic - * - * @param topicPartitions Map that contains the topic names to be created with the number of partitions - */ - private void prepareTopic(final Map topicPartitions) { - log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions); - - // first construct the topics to make ready - final Map topicsToMakeReady = new HashMap<>(); - - for (final InternalTopicConfig topic : topicPartitions.values()) { - final Optional numPartitions = topic.numberOfPartitions(); - if (!numPartitions.isPresent()) { - throw new StreamsException( - String.format("%sTopic [%s] number of partitions not defined", - logPrefix, topic.name()) - ); - } - if (!topic.hasEnforcedNumberOfPartitions()) { - topic.setNumberOfPartitions(numPartitions.get()); - } - topicsToMakeReady.put(topic.name(), topic); - } - - if (!topicsToMakeReady.isEmpty()) { - internalTopicManager.makeReady(topicsToMakeReady); - } - - log.debug("Completed validating internal topics {} in partition assignor.", topicPartitions); - } - private void ensureCopartitioning(final Collection> copartitionGroups, final Map allRepartitionTopicsNumPartitions, final Cluster metadata) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index aac9a8ac88c..6a550e714b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -55,7 +55,6 @@ import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -166,7 +165,6 @@ public class EosBetaUpgradeIntegrationTest { } @Test - @Ignore public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { // We use two KafkaStreams clients that we upgrade from eos-alpha to eos-beta. During the upgrade, // we ensure that there are pending transaction and verify that data is processed correctly. diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java index 06f866fcd3f..deccfa4800e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java @@ -179,7 +179,10 @@ public class HighAvailabilityStreamsPartitionAssignorTest { } private void overwriteInternalTopicManagerWithMock() { - final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer); + final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( + streamsConfig, + mockClientSupplier.restoreConsumer, + false); partitionAssignor.setInternalTopicManager(mockInternalTopicManager); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 644e2709014..82edeb053a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -235,7 +235,7 @@ public class StreamsPartitionAssignorTest { partitionAssignor.configure(configMap); EasyMock.replay(taskManager, adminClient); - return overwriteInternalTopicManagerWithMock(); + return overwriteInternalTopicManagerWithMock(false); } private void createDefaultMockTaskManager() { @@ -281,9 +281,13 @@ public class StreamsPartitionAssignorTest { EasyMock.replay(result); } - private MockInternalTopicManager overwriteInternalTopicManagerWithMock() { - final MockInternalTopicManager mockInternalTopicManager = - new MockInternalTopicManager(new StreamsConfig(configProps()), mockClientSupplier.restoreConsumer); + // If mockCreateInternalTopics is true the internal topic manager will report that it had to create all internal + // topics and we will skip the listOffsets request for these changelogs + private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) { + final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( + new StreamsConfig(configProps()), + mockClientSupplier.restoreConsumer, + mockCreateInternalTopics); partitionAssignor.setInternalTopicManager(mockInternalTopicManager); return mockInternalTopicManager; } @@ -1863,6 +1867,35 @@ public class StreamsPartitionAssignorTest { assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions))); } + @Test + public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() { + adminClient = EasyMock.createMock(AdminClient.class); + final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class); + final KafkaFutureImpl> allFuture = new KafkaFutureImpl<>(); + allFuture.complete(emptyMap()); + + expect(adminClient.listOffsets(emptyMap())).andStubReturn(result); + expect(result.all()).andReturn(allFuture); + + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); + builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1"); + + subscriptions.put("consumer10", + new Subscription( + singletonList("topic1"), + defaultSubscriptionInfo.encode() + )); + + EasyMock.replay(result); + configureDefault(); + overwriteInternalTopicManagerWithMock(true); + + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)); + + EasyMock.verify(adminClient); + } + private static ByteBuffer encodeFutureSubscription() { final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */); buf.putInt(LATEST_SUPPORTED_VERSION + 1); diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java index 548657ad96a..6a1947ecd20 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import java.util.Collections; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.common.PartitionInfo; @@ -29,21 +30,23 @@ import java.util.List; import java.util.Map; import java.util.Set; - public class MockInternalTopicManager extends InternalTopicManager { - final public Map readyTopics = new HashMap<>(); - final private MockConsumer restoreConsumer; + public final Map readyTopics = new HashMap<>(); + private final MockConsumer restoreConsumer; + private final boolean mockCreateInternalTopics; public MockInternalTopicManager(final StreamsConfig streamsConfig, - final MockConsumer restoreConsumer) { + final MockConsumer restoreConsumer, + final boolean mockCreateInternalTopics) { super(Admin.create(streamsConfig.originals()), streamsConfig); this.restoreConsumer = restoreConsumer; + this.mockCreateInternalTopics = mockCreateInternalTopics; } @Override - public void makeReady(final Map topics) { + public Set makeReady(final Map topics) { for (final InternalTopicConfig topic : topics.values()) { final String topicName = topic.name(); final int numberOfPartitions = topic.numberOfPartitions().get(); @@ -56,6 +59,7 @@ public class MockInternalTopicManager extends InternalTopicManager { restoreConsumer.updatePartitions(topicName, partitions); } + return mockCreateInternalTopics ? topics.keySet() : Collections.emptySet(); } @Override