diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 682ccf58cce..ee627c91ef2 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.utils; +import java.util.EnumSet; import java.util.SortedSet; import java.util.TreeSet; import org.apache.kafka.common.KafkaException; @@ -60,9 +61,13 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1093,4 +1098,52 @@ public final class Utils { ) ); } + + /** + * A Collector that offers two kinds of convenience: + * 1. You can specify the concrete type of the returned Map + * 2. You can turn a stream of Entries directly into a Map without having to mess with a key function + * and a value function. In particular, this is handy if all you need to do is apply a filter to a Map's entries. + * + * + * One thing to be wary of: These types are too "distant" for IDE type checkers to warn you if you + * try to do something like build a TreeMap of non-Comparable elements. You'd get a runtime exception for that. + * + * @param mapSupplier The constructor for your concrete map type. + * @param The Map key type + * @param The Map value type + * @param The type of the Map itself. + * @return new Collector, M, M> + */ + public static > Collector, M, M> entriesToMap(final Supplier mapSupplier) { + return new Collector, M, M>() { + @Override + public Supplier supplier() { + return mapSupplier; + } + + @Override + public BiConsumer> accumulator() { + return (map, entry) -> map.put(entry.getKey(), entry.getValue()); + } + + @Override + public BinaryOperator combiner() { + return (map, map2) -> { + map.putAll(map2); + return map; + }; + } + + @Override + public Function finisher() { + return map -> map; + } + + @Override + public Set characteristics() { + return EnumSet.of(Characteristics.UNORDERED, Characteristics.IDENTITY_FINISH); + } + }; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index e5c534886c6..085af0ef797 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -63,24 +63,6 @@ public class AssignmentTestUtils { * @return the UUID created by repeating the digit n in the UUID format */ static UUID uuidForInt(final Integer n) { - if (n < 1 || n > 7) { - throw new IllegalArgumentException("Must pass in a single digit number to the uuid builder, got n = " + n); - } - final StringBuilder builder = new StringBuilder(36); - for (int i = 0; i < 8; ++i) { - builder.append(n); - } - builder.append('-'); - - for (int i = 0; i < 3; ++i) { - for (int j = 0; j < 4; ++j) { - builder.append(n); - } - builder.append('-'); - } - for (int i = 0; i < 12; ++i) { - builder.append(n); - } - return UUID.fromString(builder.toString()); + return new UUID(0, n); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java new file mode 100644 index 00000000000..c47fc3766a1 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.hamcrest.MatcherAssert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.entriesToMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.fail; + +public class TaskAssignorConvergenceTest { + private static final class Harness { + private final Set statelessTasks; + private final Map statefulTaskEndOffsetSums; + private final Map clientStates; + private final Map droppedClientStates; + private final StringBuilder history = new StringBuilder(); + + private static Harness initializeCluster(final int numStatelessTasks, + final int numStatefulTasks, + final int numNodes) { + int subtopology = 0; + final Set statelessTasks = new TreeSet<>(); + { + int partition = 0; + for (int i = 0; i < numStatelessTasks; i++) { + statelessTasks.add(new TaskId(subtopology, partition)); + if (partition == 4) { + subtopology++; + partition = 0; + } else { + partition++; + } + } + } + + final Map statefulTaskEndOffsetSums = new TreeMap<>(); + { + subtopology++; + int partition = 0; + for (int i = 0; i < numStatefulTasks; i++) { + statefulTaskEndOffsetSums.put(new TaskId(subtopology, partition), 150000L); + + if (partition == 4) { + subtopology++; + partition = 0; + } else { + partition++; + } + } + } + + final Map clientStates = new TreeMap<>(); + for (int i = 0; i < numNodes; i++) { + final UUID uuid = uuidForInt(i); + clientStates.put(uuid, emptyInstance(uuid, statefulTaskEndOffsetSums)); + } + + return new Harness(statelessTasks, statefulTaskEndOffsetSums, clientStates); + } + + private Harness(final Set statelessTasks, + final Map statefulTaskEndOffsetSums, + final Map clientStates) { + this.statelessTasks = statelessTasks; + this.statefulTaskEndOffsetSums = statefulTaskEndOffsetSums; + this.clientStates = clientStates; + droppedClientStates = new TreeMap<>(); + history.append('\n'); + history.append("Cluster and application initial state: \n"); + history.append("Stateless tasks: ").append(statelessTasks).append('\n'); + history.append("Stateful tasks: ").append(statefulTaskEndOffsetSums.keySet()).append('\n'); + formatClientStates(true); + history.append("History of the cluster: \n"); + } + + private void addNode() { + final UUID uuid = uuidForInt(clientStates.size() + droppedClientStates.size()); + history.append("Adding new node ").append(uuid).append('\n'); + clientStates.put(uuid, emptyInstance(uuid, statefulTaskEndOffsetSums)); + } + + private static ClientState emptyInstance(final UUID uuid, final Map allTaskEndOffsetSums) { + final ClientState clientState = new ClientState(1); + clientState.computeTaskLags(uuid, allTaskEndOffsetSums); + return clientState; + } + + private void addOrResurrectNodesRandomly(final Random prng, final int limit) { + final int numberToAdd = prng.nextInt(limit); + for (int i = 0; i < numberToAdd; i++) { + final boolean addNew = prng.nextBoolean(); + if (addNew || droppedClientStates.isEmpty()) { + addNode(); + } else { + final UUID uuid = selectRandomElement(prng, droppedClientStates); + history.append("Resurrecting node ").append(uuid).append('\n'); + clientStates.put(uuid, droppedClientStates.get(uuid)); + droppedClientStates.remove(uuid); + } + } + } + + private void dropNode() { + if (clientStates.isEmpty()) { + throw new NoSuchElementException("There are no nodes to drop"); + } else { + final UUID toDrop = clientStates.keySet().iterator().next(); + dropNode(toDrop); + } + } + + private void dropRandomNodes(final int numNode, final Random prng) { + int dropped = 0; + while (!clientStates.isEmpty() && dropped < numNode) { + final UUID toDrop = selectRandomElement(prng, clientStates); + dropNode(toDrop); + dropped++; + } + history.append("Stateless tasks: ").append(statelessTasks).append('\n'); + history.append("Stateful tasks: ").append(statefulTaskEndOffsetSums.keySet()).append('\n'); + formatClientStates(true); + } + + private void dropNode(final UUID toDrop) { + final ClientState clientState = clientStates.remove(toDrop); + history.append("Dropping node ").append(toDrop).append(": ").append(clientState).append('\n'); + droppedClientStates.put(toDrop, clientState); + } + + private static UUID selectRandomElement(final Random prng, final Map clients) { + int dropIndex = prng.nextInt(clients.size()); + UUID toDrop = null; + for (final UUID uuid : clients.keySet()) { + if (dropIndex == 0) { + toDrop = uuid; + break; + } else { + dropIndex--; + } + } + return toDrop; + } + + /** + * Flip the cluster states from "assigned" to "subscribed" so they can be used for another round of assignments. + */ + private void prepareForNextRebalance() { + final Map newClientStates = new TreeMap<>(); + for (final Map.Entry entry : clientStates.entrySet()) { + final UUID uuid = entry.getKey(); + final ClientState newClientState = new ClientState(1); + final ClientState clientState = entry.getValue(); + final Map taskOffsetSums = new TreeMap<>(); + for (final TaskId taskId : clientState.activeTasks()) { + if (statefulTaskEndOffsetSums.containsKey(taskId)) { + taskOffsetSums.put(taskId, statefulTaskEndOffsetSums.get(taskId)); + } + } + for (final TaskId taskId : clientState.standbyTasks()) { + if (statefulTaskEndOffsetSums.containsKey(taskId)) { + taskOffsetSums.put(taskId, statefulTaskEndOffsetSums.get(taskId)); + } + } + newClientState.addPreviousActiveTasks(clientState.activeTasks()); + newClientState.addPreviousStandbyTasks(clientState.standbyTasks()); + newClientState.addPreviousTasksAndOffsetSums(taskOffsetSums); + newClientState.computeTaskLags(uuid, statefulTaskEndOffsetSums); + newClientStates.put(uuid, newClientState); + } + + clientStates.clear(); + clientStates.putAll(newClientStates); + } + + private void recordBefore(final int iteration) { + history.append("Starting Iteration: ").append(iteration).append('\n'); + formatClientStates(false); + } + + private void recordAfter(final int iteration, final boolean rebalancePending) { + history.append("After assignment: ").append(iteration).append('\n'); + history.append("Rebalance pending: ").append(rebalancePending).append('\n'); + formatClientStates(true); + history.append('\n'); + } + + private void formatClientStates(final boolean printUnassigned) { + final Set unassignedTasks = new TreeSet<>(); + unassignedTasks.addAll(statefulTaskEndOffsetSums.keySet()); + unassignedTasks.addAll(statelessTasks); + history.append('{').append('\n'); + for (final Map.Entry entry : clientStates.entrySet()) { + history.append(" ").append(entry.getKey()).append(": ").append(entry.getValue()).append('\n'); + unassignedTasks.removeAll(entry.getValue().assignedTasks()); + } + history.append('}').append('\n'); + if (printUnassigned) { + history.append("Unassigned Tasks: ").append(unassignedTasks).append('\n'); + } + } + + } + + @Test + public void staticAssignmentShouldConvergeWithTheFirstAssignment() { + final AssignmentConfigs configs = new AssignmentConfigs(100L, + 1, + 2, + 0, + 1000L); + + final Harness harness = Harness.initializeCluster(1, 1, 1); + + testForConvergence(harness, configs, 1); + verifyValidAssignment(0, harness); + } + + @Test + public void assignmentShouldConvergeAfterAddingNode() { + final int numStatelessTasks = 15; + final int numStatefulTasks = 13; + final int maxWarmupReplicas = 2; + final int numStandbyReplicas = 0; + + final AssignmentConfigs configs = new AssignmentConfigs(100L, + 1, + maxWarmupReplicas, + numStandbyReplicas, + 1000L); + + final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1); + testForConvergence(harness, configs, 1); + harness.addNode(); + // we expect convergence to involve moving each task at most once, and we can move "maxWarmupReplicas" number + // of tasks at once, hence the iteration limit + testForConvergence(harness, configs, numStatefulTasks / maxWarmupReplicas + 1); + verifyValidAssignment(numStandbyReplicas, harness); + } + + @Ignore // Adding this failing test before adding the code that fixes it + @Test + public void droppingNodesShouldConverge() { + final int numStatelessTasks = 15; + final int numStatefulTasks = 13; + final int maxWarmupReplicas = 2; + final int numStandbyReplicas = 0; + + final AssignmentConfigs configs = new AssignmentConfigs(100L, + 1, + maxWarmupReplicas, + numStandbyReplicas, + 1000L); + + final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7); + testForConvergence(harness, configs, 1); + harness.dropNode(); + // This time, we allow one extra iteration because the + // first stateful task needs to get shuffled back to the first node + testForConvergence(harness, configs, numStatefulTasks / maxWarmupReplicas + 2); + + verifyValidAssignment(numStandbyReplicas, harness); + } + + @Ignore // Adding this failing test before adding the code that fixes it + @Test + public void randomClusterPerturbationsShouldConverge() { + // do as many tests as we can in 10 seconds + final long deadline = System.currentTimeMillis() + 10_000L; + do { + final long seed = new Random().nextLong(); + runRandomizedScenario(seed); + } while (System.currentTimeMillis() < deadline); + } + + private static void runRandomizedScenario(final long seed) { + Harness harness = null; + try { + final Random prng = new Random(seed); + + // These are all rand(limit)+1 because we need them to be at least 1 and the upper bound is exclusive + final int initialClusterSize = prng.nextInt(10) + 1; + final int numStatelessTasks = prng.nextInt(10) + 1; + final int numStatefulTasks = prng.nextInt(10) + 1; + final int maxWarmupReplicas = prng.nextInt(numStatefulTasks) + 1; + // This one is rand(limit+1) because we _want_ to test zero and the upper bound is exclusive + final int numStandbyReplicas = prng.nextInt(initialClusterSize + 1); + + final int numberOfEvents = prng.nextInt(10) + 1; + + final AssignmentConfigs configs = new AssignmentConfigs(100L, + 1, + maxWarmupReplicas, + numStandbyReplicas, + 1000L); + + harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, initialClusterSize); + testForConvergence(harness, configs, 1); + verifyValidAssignment(numStandbyReplicas, harness); + + for (int i = 0; i < numberOfEvents; i++) { + final int event = prng.nextInt(2); + switch (event) { + case 0: + harness.dropRandomNodes(prng.nextInt(initialClusterSize), prng); + break; + case 1: + harness.addOrResurrectNodesRandomly(prng, initialClusterSize); + break; + default: + throw new IllegalStateException("Unexpected event: " + event); + } + if (!harness.clientStates.isEmpty()) { + testForConvergence(harness, configs, numStatefulTasks * 2); + verifyValidAssignment(numStandbyReplicas, harness); + } + } + } catch (final AssertionError t) { + throw new AssertionError( + "Assertion failed in randomized test. Reproduce with: `runRandomizedScenario(" + seed + ")`.", + t + ); + } catch (final Throwable t) { + final StringBuilder builder = + new StringBuilder() + .append("Exception in randomized scenario. Reproduce with: `runRandomizedScenario(") + .append(seed) + .append(")`. "); + if (harness != null) { + builder.append(harness.history); + } + throw new AssertionError(builder.toString(), t); + } + } + + private static void verifyValidAssignment(final int numStandbyReplicas, final Harness harness) { + final Map> assignments = new TreeMap<>(); + for (final TaskId taskId : harness.statefulTaskEndOffsetSums.keySet()) { + assignments.put(taskId, new TreeSet<>()); + } + for (final TaskId taskId : harness.statelessTasks) { + assignments.put(taskId, new TreeSet<>()); + } + for (final Map.Entry entry : harness.clientStates.entrySet()) { + for (final TaskId activeTask : entry.getValue().activeTasks()) { + if (assignments.containsKey(activeTask)) { + assignments.get(activeTask).add(entry.getKey()); + } + } + for (final TaskId standbyTask : entry.getValue().standbyTasks()) { + assignments.get(standbyTask).add(entry.getKey()); + } + } + final TreeMap> misassigned = + assignments + .entrySet() + .stream() + .filter(entry -> { + final int expectedActives = 1; + final boolean isStateless = harness.statelessTasks.contains(entry.getKey()); + final int expectedStandbys = isStateless ? 0 : numStandbyReplicas; + // We'll never assign even the expected number of standbys if they don't actually fit in the cluster + final int expectedAssignments = Math.min( + harness.clientStates.size(), + expectedActives + expectedStandbys + ); + return entry.getValue().size() != expectedAssignments; + }) + .collect(entriesToMap(TreeMap::new)); + + MatcherAssert.assertThat( + new StringBuilder().append("Found some over- or under-assigned tasks in the final assignment with ") + .append(numStandbyReplicas) + .append(" standby replicas.") + .append(harness.history) + .toString(), + misassigned, + is(emptyMap())); + } + + private static void testForConvergence(final Harness harness, + final AssignmentConfigs configs, + final int iterationLimit) { + final Set allTasks = new TreeSet<>(); + allTasks.addAll(harness.statelessTasks); + allTasks.addAll(harness.statefulTaskEndOffsetSums.keySet()); + + boolean rebalancePending = true; + int iteration = 0; + while (rebalancePending && iteration < iterationLimit) { + iteration++; + harness.prepareForNextRebalance(); + harness.recordBefore(iteration); + rebalancePending = new HighAvailabilityTaskAssignor( + harness.clientStates, allTasks, + harness.statefulTaskEndOffsetSums.keySet(), + configs + ).assign(); + harness.recordAfter(iteration, rebalancePending); + } + + if (rebalancePending) { + final StringBuilder message = + new StringBuilder().append("Rebalances have not converged after iteration cutoff: ") + .append(iterationLimit) + .append(harness.history); + fail(message.toString()); + } + } + + +}