Browse Source

KAFKA-15022: [1/N] initial implementation of rack aware assignor (#13851)

Part of KIP-925. Adds first internal classes to track rack.id client/partition metadata.

Reviewers: Matthias J. Sax <matthias@confluent.io>
pull/13989/head
Hao Li 1 year ago committed by GitHub
parent
commit
0e56cc8841
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 78
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
  2. 187
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java
  3. 58
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
  4. 292
      streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java

78
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.Map.Entry;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
@ -27,6 +28,7 @@ import org.apache.kafka.clients.admin.NewTopic; @@ -27,6 +28,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.TopicConfig;
@ -409,6 +411,43 @@ public class InternalTopicManager { @@ -409,6 +411,43 @@ public class InternalTopicManager {
return brokerSideConfigEntry.value();
}
public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics) {
log.debug("Starting to describe topics {} in partition assignor.", topics);
long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
final Set<String> topicsToDescribe = new HashSet<>(topics);
final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>();
while (!topicsToDescribe.isEmpty()) {
final Map<String, List<TopicPartitionInfo>> existed = getTopicPartitionInfo(topicsToDescribe, null);
topicPartitionInfo.putAll(existed);
topicsToDescribe.removeAll(topicPartitionInfo.keySet());
if (!topicsToDescribe.isEmpty()) {
currentWallClockMs = time.milliseconds();
if (currentWallClockMs >= deadlineMs) {
final String timeoutError = String.format(
"Could not create topics within %d milliseconds. " +
"This can happen if the Kafka cluster is temporarily not available.",
retryTimeoutMs);
log.error(timeoutError);
throw new TimeoutException(timeoutError);
}
log.info(
"Topics {} could not be describe fully. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
topics,
retryBackOffMs,
deadlineMs - currentWallClockMs
);
Utils.sleep(retryBackOffMs);
}
}
log.debug("Completed describing topics");
return topicPartitionInfo;
}
/**
* Prepares a set of given internal topics.
*
@ -539,24 +578,22 @@ public class InternalTopicManager { @@ -539,24 +578,22 @@ public class InternalTopicManager {
}
/**
* Try to get the number of partitions for the given topics; return the number of partitions for topics that already exists.
* Try to get the partition information for the given topics; return the partition info for topics that already exists.
*
* Topics that were not able to get its description will simply not be returned
*/
// visible for testing
protected Map<String, Integer> getNumPartitions(final Set<String> topics,
final Set<String> tempUnknownTopics) {
log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
protected Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics,
final Set<String> tempUnknownTopics) {
final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.topicNameValues();
final Map<String, Integer> existedTopicPartition = new HashMap<>();
final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>();
for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) {
final String topicName = topicFuture.getKey();
try {
final TopicDescription topicDescription = topicFuture.getValue().get();
existedTopicPartition.put(topicName, topicDescription.partitions().size());
topicPartitionInfo.put(topicName, topicDescription.partitions());
} catch (final InterruptedException fatalException) {
// this should not happen; if it ever happens it indicate a bug
Thread.currentThread().interrupt();
@ -565,15 +602,19 @@ public class InternalTopicManager { @@ -565,15 +602,19 @@ public class InternalTopicManager {
} catch (final ExecutionException couldNotDescribeTopicException) {
final Throwable cause = couldNotDescribeTopicException.getCause();
if (cause instanceof UnknownTopicOrPartitionException) {
// This topic didn't exist, proceed to try to create it
// This topic didn't exist
log.debug("Topic {} is unknown or not found, hence not existed yet.\n" +
"Error message was: {}", topicName, cause.toString());
} else if (cause instanceof LeaderNotAvailableException) {
tempUnknownTopics.add(topicName);
if (tempUnknownTopics != null) {
tempUnknownTopics.add(topicName);
}
log.debug("The leader of topic {} is not available.\n" +
"Error message was: {}", topicName, cause.toString());
} else if (cause instanceof TimeoutException) {
tempUnknownTopics.add(topicName);
if (tempUnknownTopics != null) {
tempUnknownTopics.add(topicName);
}
log.debug("Describing topic {} (to get number of partitions) timed out.\n" +
"Error message was: {}", topicName, cause.toString());
} else {
@ -584,7 +625,22 @@ public class InternalTopicManager { @@ -584,7 +625,22 @@ public class InternalTopicManager {
}
}
return existedTopicPartition;
return topicPartitionInfo;
}
/**
* Try to get the number of partitions for the given topics; return the number of partitions for topics that already exists.
*
* Topics that were not able to get its description will simply not be returned
*/
// visible for testing
protected Map<String, Integer> getNumPartitions(final Set<String> topics,
final Set<String> tempUnknownTopics) {
log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = getTopicPartitionInfo(topics, tempUnknownTopics);
return topicPartitionInfo.entrySet().stream().collect(Collectors.toMap(
Entry::getKey, e -> e.getValue().size()));
}
/**

187
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java

@ -0,0 +1,187 @@ @@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RackAwareTaskAssignor {
private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
private final Cluster fullMetadata;
private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
private final Map<UUID, Map<String, Optional<String>>> processRacks;
private final AssignmentConfigs assignmentConfigs;
private final Map<TopicPartition, Set<String>> racksForPartition;
private final InternalTopicManager internalTopicManager;
public RackAwareTaskAssignor(final Cluster fullMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
final Map<UUID, Map<String, Optional<String>>> processRacks,
final InternalTopicManager internalTopicManager,
final AssignmentConfigs assignmentConfigs) {
this.fullMetadata = fullMetadata;
this.partitionsForTask = partitionsForTask;
this.processRacks = processRacks;
this.internalTopicManager = internalTopicManager;
this.assignmentConfigs = assignmentConfigs;
this.racksForPartition = new HashMap<>();
}
public synchronized boolean canEnableRackAwareAssignorForActiveTasks() {
/*
TODO: enable this after we add the config
if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
canEnableForActive = false;
return false;
}
*/
if (!validateClientRack()) {
return false;
}
return validateTopicPartitionRack();
}
public boolean canEnableRackAwareAssignorForStandbyTasks() {
// TODO
return false;
}
// Visible for testing. This method also checks if all TopicPartitions exist in cluster
public boolean populateTopicsToDiscribe(final Set<String> topicsToDescribe) {
// Make sure rackId exist for all TopicPartitions needed
for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
for (final TopicPartition topicPartition : topicPartitions) {
final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
if (partitionInfo == null) {
log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
return false;
}
final Node[] replica = partitionInfo.replicas();
if (replica == null || replica.length == 0) {
topicsToDescribe.add(topicPartition.topic());
continue;
}
for (final Node node : replica) {
if (node.hasRack()) {
racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
} else {
log.warn("Node {} for topic partition {} doesn't have rack", node, topicPartition);
return false;
}
}
}
}
return true;
}
private boolean validateTopicPartitionRack() {
// Make sure rackId exist for all TopicPartitions needed
final Set<String> topicsToDescribe = new HashSet<>();
if (!populateTopicsToDiscribe(topicsToDescribe)) {
return false;
}
if (!topicsToDescribe.isEmpty()) {
log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
try {
final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
if (topicsToDescribe.size() > topicPartitionInfo.size()) {
topicsToDescribe.removeAll(topicPartitionInfo.keySet());
log.error("Failed to describe topic for {}", topicsToDescribe);
return false;
}
for (final Map.Entry<String, List<TopicPartitionInfo>> entry : topicPartitionInfo.entrySet()) {
final List<TopicPartitionInfo> partitionInfos = entry.getValue();
for (final TopicPartitionInfo partitionInfo : partitionInfos) {
final int partition = partitionInfo.partition();
final List<Node> replicas = partitionInfo.replicas();
if (replicas == null || replicas.isEmpty()) {
log.error("No replicas found for topic partition {}: {}", entry.getKey(), partition);
return false;
}
final TopicPartition topicPartition = new TopicPartition(entry.getKey(), partition);
for (final Node node : replicas) {
if (node.hasRack()) {
racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
} else {
return false;
}
}
}
}
} catch (final Exception e) {
log.error("Failed to describe topics {}", topicsToDescribe, e);
return false;
}
}
return true;
}
// Visible for testing
public boolean validateClientRack() {
/*
* Check rack information is populated correctly in clients
* 1. RackId exist for all clients
* 2. Different consumerId for same process should have same rackId
*/
for (final Map.Entry<UUID, Map<String, Optional<String>>> entry : processRacks.entrySet()) {
final UUID processId = entry.getKey();
KeyValue<String, String> previousRackInfo = null;
for (final Map.Entry<String, Optional<String>> rackEntry : entry.getValue().entrySet()) {
if (!rackEntry.getValue().isPresent()) {
log.warn("RackId doesn't exist for process {} and consumer {}. Disable {}",
processId, rackEntry.getKey(), getClass().getName());
return false;
}
if (previousRackInfo == null) {
previousRackInfo = KeyValue.pair(rackEntry.getKey(), rackEntry.getValue().get());
} else if (!previousRackInfo.value.equals(rackEntry.getValue().get())) {
log.error(
"Consumers {} and {} for same process {} has different rackId {} and {}. File a ticket for this bug. Disable {}",
previousRackInfo.key,
rackEntry.getKey(),
entry.getKey(),
previousRackInfo.value,
rackEntry.getValue().get(),
getClass().getName());
return false;
}
}
}
return true;
}
}

58
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java

@ -279,6 +279,26 @@ public class InternalTopicManagerTest { @@ -279,6 +279,26 @@ public class InternalTopicManagerTest {
+ " You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error."));
}
@Test
public void shouldThrowTimeoutExceptionInGetPartitionInfo() {
setupTopicInMockAdminClient(topic1, Collections.emptyMap());
final MockTime time = new MockTime(5);
mockAdminClient.timeoutNextRequest(Integer.MAX_VALUE);
final InternalTopicManager internalTopicManager =
new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
final TimeoutException exception = assertThrows(
TimeoutException.class,
() -> internalTopicManager.getTopicPartitionInfo(Collections.singleton(topic1))
);
assertThat(
exception.getMessage(),
is("Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available.")
);
}
@Test
public void shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {
setupTopicInMockAdminClient(topic1, Collections.emptyMap());
@ -305,6 +325,31 @@ public class InternalTopicManagerTest { @@ -305,6 +325,31 @@ public class InternalTopicManagerTest {
);
}
@Test
public void shouldThrowTimeoutExceptionIfGetPartitionInfoHasTopicDescriptionTimeout() {
mockAdminClient.timeoutNextRequest(1);
final InternalTopicManager internalTopicManager =
new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
try {
final Set<String> topic1set = new HashSet<>(Collections.singletonList(topic1));
internalTopicManager.getTopicPartitionInfo(topic1set, null);
} catch (final TimeoutException expected) {
assertEquals(TimeoutException.class, expected.getCause().getClass());
}
mockAdminClient.timeoutNextRequest(1);
try {
final Set<String> topic2set = new HashSet<>(Collections.singletonList(topic2));
internalTopicManager.getTopicPartitionInfo(topic2set, null);
} catch (final TimeoutException expected) {
assertEquals(TimeoutException.class, expected.getCause().getClass());
}
}
@Test
public void shouldThrowTimeoutExceptionIfGetNumPartitionsHasTopicDescriptionTimeout() {
mockAdminClient.timeoutNextRequest(1);
@ -631,6 +676,19 @@ public class InternalTopicManagerTest { @@ -631,6 +676,19 @@ public class InternalTopicManagerTest {
internalTopicManager.getNumPartitions(Collections.singleton(topic1), Collections.emptySet()));
}
@Test
public void shouldReturnCorrectPartitionInfo() {
final TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList());
mockAdminClient.addTopic(
false,
topic1,
Collections.singletonList(topicPartitionInfo),
null);
final Map<String, List<TopicPartitionInfo>> ret = internalTopicManager.getTopicPartitionInfo(Collections.singleton(topic1));
assertEquals(Collections.singletonMap(topic1, Collections.singletonList(topicPartitionInfo)), ret);
}
@Test
public void shouldCreateRequiredTopics() throws Exception {
final InternalTopicConfig topicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());

292
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java

@ -0,0 +1,292 @@ @@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.junit.Test;
public class RackAwareTaskAssignorTest {
private final static String TOPIC0 = "topic0";
private final static String TOPIC1 = "topic1";
private static final String USER_END_POINT = "localhost:8080";
private static final String APPLICATION_ID = "stream-partition-assignor-test";
private final Node node0 = new Node(0, "node0", 1, "rack1");
private final Node node1 = new Node(1, "node1", 1, "rack2");
private final Node node2 = new Node(2, "node2", 1, "rack3");
private final Node noRackNode = new Node(3, "node3", 1);
private final Node[] replicas1 = new Node[] {node0, node1};
private final Node[] replicas2 = new Node[] {node1, node2};
private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas1, replicas1);
private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas2, replicas2);
private final TopicPartition partitionWithoutInfo00 = new TopicPartition(TOPIC0, 0);
private final TopicPartition partitionWithoutInfo01 = new TopicPartition(TOPIC0, 1);
private final TopicPartition partitionWithoutInfo10 = new TopicPartition(TOPIC1, 0);
private final UUID process0UUID = UUID.randomUUID();
private final UUID process1UUID = UUID.randomUUID();
private final Subtopology subtopology1 = new Subtopology(1, "topology1");
private final MockTime time = new MockTime();
private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
time,
streamsConfig,
mockClientSupplier.restoreConsumer,
false
);
private Map<String, Object> configProps() {
final Map<String, Object> configurationMap = new HashMap<>();
configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
final ReferenceContainer referenceContainer = new ReferenceContainer();
/*
referenceContainer.mainConsumer = consumer;
referenceContainer.adminClient = adminClient;
referenceContainer.taskManager = taskManager;
referenceContainer.streamsMetadataState = streamsMetadataState;
referenceContainer.time = time;
*/
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
return configurationMap;
}
@Test
public void disableActiveSinceMissingClusterInfo() {
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
getClusterForTopic0(),
getTaskTopicPartitionMapForTask1(true),
getTopologyGroupTaskMap(),
getProcessRacksForProcess0(),
mockInternalTopicManager,
new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
);
// False since partitionWithoutInfo10 is missing in cluster metadata
assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
assertFalse(assignor.populateTopicsToDiscribe(new HashSet<>()));
assertTrue(assignor.validateClientRack());
}
@Test
public void disableActiveSinceRackMissingInNode() {
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
getClusterWithPartitionMissingRack(),
getTaskTopicPartitionMapForTask1(),
getTopologyGroupTaskMap(),
getProcessRacksForProcess0(),
mockInternalTopicManager,
new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
);
assertTrue(assignor.validateClientRack());
assertFalse(assignor.populateTopicsToDiscribe(new HashSet<>()));
// False since nodeMissingRack has one node which doesn't have rack
assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
}
@Test
public void disableActiveSinceRackMissingInClient() {
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
getClusterForTopic0(),
getTaskTopicPartitionMapForTask1(),
getTopologyGroupTaskMap(),
getProcessRacksForProcess0(true),
mockInternalTopicManager,
new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
);
// False since process1 doesn't have rackId
assertFalse(assignor.validateClientRack());
assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
}
@Test
public void disableActiveSinceRackDiffersInSameProcess() {
final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
// Different consumers in same process have different rack ID. This shouldn't happen.
// If happens, there's a bug somewhere
processRacks.computeIfAbsent(process0UUID, k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
processRacks.computeIfAbsent(process0UUID, k -> new HashMap<>()).put("consumer2", Optional.of("rack2"));
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
getClusterForTopic0(),
getTaskTopicPartitionMapForTask1(),
getTopologyGroupTaskMap(),
processRacks,
mockInternalTopicManager,
new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
);
assertFalse(assignor.validateClientRack());
assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
}
@Test
public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
getClusterForTopic0(),
getTaskTopicPartitionMapForTask1(),
getTopologyGroupTaskMap(),
getProcessRacksForProcess0(),
mockInternalTopicManager,
new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
);
// partitionWithoutInfo00 has rackInfo in cluster metadata
assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks());
}
@Test
public void enableRackAwareAssignorForActiveWithDescribingTopics() {
final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager);
doReturn(
Collections.singletonMap(
TOPIC0,
Collections.singletonList(
new TopicPartitionInfo(0, node0, Arrays.asList(replicas1), Collections.emptyList())
)
)
).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
getClusterWithNoNode(),
getTaskTopicPartitionMapForTask1(),
getTopologyGroupTaskMap(),
getProcessRacksForProcess0(),
spyTopicManager,
new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
);
assertTrue(assignor.canEnableRackAwareAssignorForActiveTasks());
}
@Test
public void disableRackAwareAssignorForActiveWithDescribingTopicsFailure() {
final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager);
doThrow(new TimeoutException("Timeout describing topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
getClusterWithNoNode(),
getTaskTopicPartitionMapForTask1(),
getTopologyGroupTaskMap(),
getProcessRacksForProcess0(),
spyTopicManager,
new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
);
assertFalse(assignor.canEnableRackAwareAssignorForActiveTasks());
assertTrue(assignor.populateTopicsToDiscribe(new HashSet<>()));
}
private Cluster getClusterForTopic0() {
return new Cluster(
"cluster",
new HashSet<>(Arrays.asList(node0, node1, node2)),
new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
Collections.emptySet(),
Collections.emptySet()
);
}
private Cluster getClusterWithPartitionMissingRack() {
final Node[] nodeMissingRack = new Node[]{node0, noRackNode};
final PartitionInfo partitionInfoMissingNode = new PartitionInfo(TOPIC0, 0, node0, nodeMissingRack, nodeMissingRack);
return new Cluster(
"cluster",
new HashSet<>(Arrays.asList(node0, node1, node2)),
new HashSet<>(Arrays.asList(partitionInfoMissingNode, partitionInfo01)),
Collections.emptySet(),
Collections.emptySet()
);
}
private Cluster getClusterWithNoNode() {
final PartitionInfo noNodeInfo = new PartitionInfo(TOPIC0, 0, null, new Node[0], new Node[0]);
return new Cluster(
"cluster",
new HashSet<>(Arrays.asList(node0, node1, node2, Node.noNode())), // mockClientSupplier.setCluster requires noNode
Collections.singleton(noNodeInfo),
Collections.emptySet(),
Collections.emptySet()
);
}
private Map<UUID, Map<String, Optional<String>>> getProcessRacksForProcess0() {
return getProcessRacksForProcess0(false);
}
private Map<UUID, Map<String, Optional<String>>> getProcessRacksForProcess0(final boolean missingRack) {
final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
final Optional<String> rack = missingRack ? Optional.empty() : Optional.of("rack1");
processRacks.put(process0UUID, Collections.singletonMap("consumer1", rack));
return processRacks;
}
private Map<TaskId, Set<TopicPartition>> getTaskTopicPartitionMapForTask1() {
return getTaskTopicPartitionMapForTask1(false);
}
private Map<TaskId, Set<TopicPartition>> getTaskTopicPartitionMapForTask1(final boolean extraTopic) {
final Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(partitionWithoutInfo00);
if (extraTopic) {
topicPartitions.add(partitionWithoutInfo10);
}
return Collections.singletonMap(new TaskId(1, 1), topicPartitions);
}
private Map<Subtopology, Set<TaskId>> getTopologyGroupTaskMap() {
return Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1)));
}
}
Loading…
Cancel
Save