Browse Source

MINOR: skip listOffsets request for newly created changelog topics (#8662)

A small hotfix to avoid an extra probing rebalance the first time an application is launched.
This should particularly improve the testing experience.

Reviewer: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>
pull/8674/head
A. Sophie Blee-Goldman 5 years ago committed by GitHub
parent
commit
4e722021a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
  2. 15
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
  3. 85
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
  4. 2
      streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
  5. 5
      streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
  6. 41
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
  7. 14
      streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java

@ -36,9 +36,13 @@ import java.util.concurrent.TimeUnit; @@ -36,9 +36,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
// currently admin client is shared among all threads
public static String getSharedAdminClientId(final String clientId) {
return clientId + "-admin";
@ -105,6 +109,7 @@ public class ClientUtils { @@ -105,6 +109,7 @@ public class ClientUtils {
endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
} catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) {
LOG.warn("listOffsets request failed.", e);
throw new StreamsException("Unable to obtain end offsets from kafka", e);
}
return endOffsets;

15
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java

@ -93,16 +93,20 @@ public class InternalTopicManager { @@ -93,16 +93,20 @@ public class InternalTopicManager {
* If a topic does not exist creates a new topic.
* If a topic with the correct number of partitions exists ignores it.
* If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
* @return the set of topics which had to be newly created
*/
public void makeReady(final Map<String, InternalTopicConfig> topics) {
public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
// we will do the validation / topic-creation in a loop, until we have confirmed all topics
// have existed with the expected number of partitions, or some create topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition assignor.", topics);
int remainingRetries = retries;
Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty() && remainingRetries >= 0) {
topicsNotReady = validateTopics(topicsNotReady, topics);
newlyCreatedTopics.addAll(topicsNotReady);
if (!topicsNotReady.isEmpty()) {
final Set<NewTopic> newTopics = new HashSet<>();
@ -169,6 +173,9 @@ public class InternalTopicManager { @@ -169,6 +173,9 @@ public class InternalTopicManager {
log.error(timeoutAndRetryError);
throw new StreamsException(timeoutAndRetryError);
}
log.debug("Completed validating internal topics and created {}", newlyCreatedTopics);
return newlyCreatedTopics;
}
/**
@ -227,7 +234,11 @@ public class InternalTopicManager { @@ -227,7 +234,11 @@ public class InternalTopicManager {
final Set<String> topicsToCreate = new HashSet<>();
for (final String topicName : topicsToValidate) {
final Optional<Integer> numberOfPartitions = topicsMap.get(topicName).numberOfPartitions();
if (existedTopicPartition.containsKey(topicName) && numberOfPartitions.isPresent()) {
if (!numberOfPartitions.isPresent()) {
log.error("Found undefined number of partitions for topic {}", topicName);
throw new StreamsException("Topic " + topicName + " number of partitions not defined");
}
if (existedTopicPartition.containsKey(topicName)) {
if (!existedTopicPartition.get(topicName).equals(numberOfPartitions.get())) {
final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " +
"expected: %d; actual: %d. " +

85
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

@ -492,7 +492,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -492,7 +492,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// make sure the repartition source topics exist with the right number of partitions,
// create these topics if necessary
prepareTopic(repartitionTopicMetadata);
internalTopicManager.makeReady(repartitionTopicMetadata);
// augment the metadata with the newly computed number of partitions for all the
// repartition source topics
@ -640,14 +640,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -640,14 +640,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
/**
* Resolve changelog topic metadata and create them if necessary.
*
* @return mapping of stateful tasks to their set of changelog topics
* Resolve changelog topic metadata and create them if necessary. Fills in the changelogsByStatefulTask map
* and returns the set of changelogs which were newly created.
*/
private Map<TaskId, Set<TopicPartition>> prepareChangelogTopics(final Map<Integer, TopicsInfo> topicGroups,
final Map<Integer, Set<TaskId>> tasksForTopicGroup) {
final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask = new HashMap<>();
private Set<String> prepareChangelogTopics(final Map<Integer, TopicsInfo> topicGroups,
final Map<Integer, Set<TaskId>> tasksForTopicGroup,
final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask) {
// add tasks to state change log topic subscribers
final Map<String, InternalTopicConfig> changelogTopicMetadata = new HashMap<>();
for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
@ -685,9 +683,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -685,9 +683,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
}
prepareTopic(changelogTopicMetadata);
final Set<String> newlyCreatedTopics = internalTopicManager.makeReady(changelogTopicMetadata);
log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values());
return changelogsByStatefulTask;
return newlyCreatedTopics;
}
/**
@ -703,12 +701,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -703,12 +701,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata);
final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask =
prepareChangelogTopics(topicGroups, tasksForTopicGroup);
final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask = new HashMap<>();
final Set<String> newlyCreatedChangelogs = prepareChangelogTopics(topicGroups, tasksForTopicGroup, changelogsByStatefulTask);
final Map<UUID, ClientState> clientStates = new HashMap<>();
final boolean lagComputationSuccessful =
populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask);
populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogsByStatefulTask, newlyCreatedChangelogs);
final Set<TaskId> allTasks = partitionsForTask.keySet();
final Set<TaskId> statefulTasks = changelogsByStatefulTask.keySet();
@ -758,16 +756,26 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -758,16 +756,26 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private boolean populateClientStatesMap(final Map<UUID, ClientState> clientStates,
final Map<UUID, ClientMetadata> clientMetadataMap,
final Map<TopicPartition, TaskId> taskForPartition,
final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask) {
final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask,
final Set<String> newlyCreatedChangelogs) {
boolean fetchEndOffsetsSuccessful;
Map<TaskId, Long> allTaskEndOffsetSums;
try {
final Collection<TopicPartition> allChangelogPartitions = changelogsByStatefulTask.values().stream()
final Collection<TopicPartition> allChangelogPartitions =
changelogsByStatefulTask.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
final Collection<TopicPartition> allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions);
allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic()));
final Collection<TopicPartition> allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
fetchEndOffsets(allChangelogPartitions, adminClient, Duration.ofMillis(adminClientTimeout));
allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask);
fetchEndOffsets(allPreexistingChangelogPartitions, adminClient, Duration.ofMillis(adminClientTimeout));
allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
fetchEndOffsetsSuccessful = true;
} catch (final StreamsException e) {
allTaskEndOffsetSums = null;
@ -794,7 +802,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -794,7 +802,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
* @return Map from stateful task to its total end offset summed across all changelog partitions
*/
private Map<TaskId, Long> computeEndOffsetSumsByTask(final Map<TopicPartition, ListOffsetsResultInfo> endOffsets,
final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask) {
final Map<TaskId, Set<TopicPartition>> changelogsByStatefulTask,
final Collection<TopicPartition> newlyCreatedChangelogPartitions) {
final Map<TaskId, Long> taskEndOffsetSums = new HashMap<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> taskEntry : changelogsByStatefulTask.entrySet()) {
final TaskId task = taskEntry.getKey();
@ -802,12 +811,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -802,12 +811,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
taskEndOffsetSums.put(task, 0L);
for (final TopicPartition changelog : changelogs) {
final long changelogEndOffset;
if (newlyCreatedChangelogPartitions.contains(changelog)) {
changelogEndOffset = 0L;
} else {
final ListOffsetsResultInfo offsetResult = endOffsets.get(changelog);
if (offsetResult == null) {
log.debug("Fetched end offsets did not contain the changelog {} of task {}", changelog, task);
throw new IllegalStateException("Could not get end offset for " + changelog);
}
final long newEndOffsetSum = taskEndOffsetSums.get(task) + offsetResult.offset();
changelogEndOffset = offsetResult.offset();
}
final long newEndOffsetSum = taskEndOffsetSums.get(task) + changelogEndOffset;
if (newEndOffsetSum < 0) {
taskEndOffsetSums.put(task, Long.MAX_VALUE);
break;
@ -1525,38 +1540,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @@ -1525,38 +1540,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
}
/**
* Internal helper function that creates a Kafka topic
*
* @param topicPartitions Map that contains the topic names to be created with the number of partitions
*/
private void prepareTopic(final Map<String, InternalTopicConfig> topicPartitions) {
log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions);
// first construct the topics to make ready
final Map<String, InternalTopicConfig> topicsToMakeReady = new HashMap<>();
for (final InternalTopicConfig topic : topicPartitions.values()) {
final Optional<Integer> numPartitions = topic.numberOfPartitions();
if (!numPartitions.isPresent()) {
throw new StreamsException(
String.format("%sTopic [%s] number of partitions not defined",
logPrefix, topic.name())
);
}
if (!topic.hasEnforcedNumberOfPartitions()) {
topic.setNumberOfPartitions(numPartitions.get());
}
topicsToMakeReady.put(topic.name(), topic);
}
if (!topicsToMakeReady.isEmpty()) {
internalTopicManager.makeReady(topicsToMakeReady);
}
log.debug("Completed validating internal topics {} in partition assignor.", topicPartitions);
}
private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups,
final Map<String, InternalTopicConfig> allRepartitionTopicsNumPartitions,
final Cluster metadata) {

2
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java

@ -55,7 +55,6 @@ import org.apache.kafka.test.StreamsTestUtils; @@ -55,7 +55,6 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@ -166,7 +165,6 @@ public class EosBetaUpgradeIntegrationTest { @@ -166,7 +165,6 @@ public class EosBetaUpgradeIntegrationTest {
}
@Test
@Ignore
public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
// We use two KafkaStreams clients that we upgrade from eos-alpha to eos-beta. During the upgrade,
// we ensure that there are pending transaction and verify that data is processed correctly.

5
streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java

@ -179,7 +179,10 @@ public class HighAvailabilityStreamsPartitionAssignorTest { @@ -179,7 +179,10 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
}
private void overwriteInternalTopicManagerWithMock() {
final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
streamsConfig,
mockClientSupplier.restoreConsumer,
false);
partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
}

41
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java

@ -235,7 +235,7 @@ public class StreamsPartitionAssignorTest { @@ -235,7 +235,7 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.configure(configMap);
EasyMock.replay(taskManager, adminClient);
return overwriteInternalTopicManagerWithMock();
return overwriteInternalTopicManagerWithMock(false);
}
private void createDefaultMockTaskManager() {
@ -281,9 +281,13 @@ public class StreamsPartitionAssignorTest { @@ -281,9 +281,13 @@ public class StreamsPartitionAssignorTest {
EasyMock.replay(result);
}
private MockInternalTopicManager overwriteInternalTopicManagerWithMock() {
final MockInternalTopicManager mockInternalTopicManager =
new MockInternalTopicManager(new StreamsConfig(configProps()), mockClientSupplier.restoreConsumer);
// If mockCreateInternalTopics is true the internal topic manager will report that it had to create all internal
// topics and we will skip the listOffsets request for these changelogs
private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) {
final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
new StreamsConfig(configProps()),
mockClientSupplier.restoreConsumer,
mockCreateInternalTopics);
partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
return mockInternalTopicManager;
}
@ -1863,6 +1867,35 @@ public class StreamsPartitionAssignorTest { @@ -1863,6 +1867,35 @@ public class StreamsPartitionAssignorTest {
assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
}
@Test
public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() {
adminClient = EasyMock.createMock(AdminClient.class);
final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
final KafkaFutureImpl<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = new KafkaFutureImpl<>();
allFuture.complete(emptyMap());
expect(adminClient.listOffsets(emptyMap())).andStubReturn(result);
expect(result.all()).andReturn(allFuture);
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
builder.addStateStore(new MockKeyValueStoreBuilder("store1", false), "processor1");
subscriptions.put("consumer10",
new Subscription(
singletonList("topic1"),
defaultSubscriptionInfo.encode()
));
EasyMock.replay(result);
configureDefault();
overwriteInternalTopicManagerWithMock(true);
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions));
EasyMock.verify(adminClient);
}
private static ByteBuffer encodeFutureSubscription() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */);
buf.putInt(LATEST_SUPPORTED_VERSION + 1);

14
streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
import java.util.Collections;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.common.PartitionInfo;
@ -29,21 +30,23 @@ import java.util.List; @@ -29,21 +30,23 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
public class MockInternalTopicManager extends InternalTopicManager {
final public Map<String, Integer> readyTopics = new HashMap<>();
final private MockConsumer<byte[], byte[]> restoreConsumer;
public final Map<String, Integer> readyTopics = new HashMap<>();
private final MockConsumer<byte[], byte[]> restoreConsumer;
private final boolean mockCreateInternalTopics;
public MockInternalTopicManager(final StreamsConfig streamsConfig,
final MockConsumer<byte[], byte[]> restoreConsumer) {
final MockConsumer<byte[], byte[]> restoreConsumer,
final boolean mockCreateInternalTopics) {
super(Admin.create(streamsConfig.originals()), streamsConfig);
this.restoreConsumer = restoreConsumer;
this.mockCreateInternalTopics = mockCreateInternalTopics;
}
@Override
public void makeReady(final Map<String, InternalTopicConfig> topics) {
public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
for (final InternalTopicConfig topic : topics.values()) {
final String topicName = topic.name();
final int numberOfPartitions = topic.numberOfPartitions().get();
@ -56,6 +59,7 @@ public class MockInternalTopicManager extends InternalTopicManager { @@ -56,6 +59,7 @@ public class MockInternalTopicManager extends InternalTopicManager {
restoreConsumer.updatePartitions(topicName, partitions);
}
return mockCreateInternalTopics ? topics.keySet() : Collections.emptySet();
}
@Override

Loading…
Cancel
Save