diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index ee537eba788..9575ba6dc42 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -39,7 +39,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; /** @@ -185,24 +187,49 @@ public class StickyAssignor extends AbstractPartitionAssignor { // these schemas are used for preserving consumer's previously assigned partitions // list and sending it as user data to the leader during a rebalance - private static final String TOPIC_PARTITIONS_KEY_NAME = "previous_assignment"; - private static final String TOPIC_KEY_NAME = "topic"; - private static final String PARTITIONS_KEY_NAME = "partitions"; - private static final Schema TOPIC_ASSIGNMENT = new Schema( + static final String TOPIC_PARTITIONS_KEY_NAME = "previous_assignment"; + static final String TOPIC_KEY_NAME = "topic"; + static final String PARTITIONS_KEY_NAME = "partitions"; + private static final String GENERATION_KEY_NAME = "generation"; + private static final int DEFAULT_GENERATION = -1; + static final Schema TOPIC_ASSIGNMENT = new Schema( new Field(TOPIC_KEY_NAME, Type.STRING), new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32))); - private static final Schema STICKY_ASSIGNOR_USER_DATA = new Schema( + static final Schema STICKY_ASSIGNOR_USER_DATA_V0 = new Schema( new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT))); + private static final Schema STICKY_ASSIGNOR_USER_DATA_V1 = new Schema( + new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT)), + new Field(GENERATION_KEY_NAME, Type.INT32)); private List memberAssignment = null; private PartitionMovements partitionMovements; + private int generation = DEFAULT_GENERATION; // consumer group generation + + static final class ConsumerUserData { + final List partitions; + final Optional generation; + ConsumerUserData(List partitions, Optional generation) { + this.partitions = partitions; + this.generation = generation; + } + } + + static final class ConsumerGenerationPair { + final String consumer; + final int generation; + ConsumerGenerationPair(String consumer, int generation) { + this.consumer = consumer; + this.generation = generation; + } + } public Map> assign(Map partitionsPerTopic, Map subscriptions) { Map> currentAssignment = new HashMap<>(); + Map prevAssignment = new HashMap<>(); partitionMovements = new PartitionMovements(); - prepopulateCurrentAssignments(subscriptions, currentAssignment); + prepopulateCurrentAssignments(subscriptions, currentAssignment, prevAssignment); boolean isFreshAssignment = currentAssignment.isEmpty(); // a mapping of all topic partitions to all consumers that can be assigned to them @@ -213,12 +240,12 @@ public class StickyAssignor extends AbstractPartitionAssignor { // initialize partition2AllPotentialConsumers and consumer2AllPotentialPartitions in the following two for loops for (Entry entry: partitionsPerTopic.entrySet()) { for (int i = 0; i < entry.getValue(); ++i) - partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList()); + partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>()); } for (Entry entry: subscriptions.entrySet()) { String consumer = entry.getKey(); - consumer2AllPotentialPartitions.put(consumer, new ArrayList()); + consumer2AllPotentialPartitions.put(consumer, new ArrayList<>()); entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> { for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { TopicPartition topicPartition = new TopicPartition(topic, i); @@ -229,7 +256,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist if (!currentAssignment.containsKey(consumer)) - currentAssignment.put(consumer, new ArrayList()); + currentAssignment.put(consumer, new ArrayList<>()); } // a mapping of partition to current consumer @@ -239,7 +266,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { currentPartitionConsumer.put(topicPartition, entry.getKey()); List sortedPartitions = sortPartitions( - currentAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions); + currentAssignment, prevAssignment.keySet(), isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions); // all partitions that need to be assigned (initially set to all partitions but adjusted in the following loop) List unassignedPartitions = new ArrayList<>(sortedPartitions); @@ -278,23 +305,68 @@ public class StickyAssignor extends AbstractPartitionAssignor { TreeSet sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); - balance(currentAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, + balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer); return currentAssignment; } private void prepopulateCurrentAssignments(Map subscriptions, - Map> currentAssignment) { - for (Map.Entry subscriptionEntry : subscriptions.entrySet()) { + Map> currentAssignment, + Map prevAssignment) { + // we need to process subscriptions' user data with each consumer's reported generation in mind + // higher generations overwrite lower generations in case of a conflict + // note that a conflict could exists only if user data is for different generations + + // for each partition we create a sorted map of its consumers by generation + Map> sortedPartitionConsumersByGeneration = new HashMap<>(); + for (Map.Entry subscriptionEntry: subscriptions.entrySet()) { + String consumer = subscriptionEntry.getKey(); ByteBuffer userData = subscriptionEntry.getValue().userData(); - if (userData != null && userData.hasRemaining()) - currentAssignment.put(subscriptionEntry.getKey(), deserializeTopicPartitionAssignment(userData)); + if (userData == null || !userData.hasRemaining()) continue; + ConsumerUserData consumerUserData = deserializeTopicPartitionAssignment(userData); + + for (TopicPartition partition: consumerUserData.partitions) { + if (sortedPartitionConsumersByGeneration.containsKey(partition)) { + Map consumers = sortedPartitionConsumersByGeneration.get(partition); + if (consumerUserData.generation.isPresent() && consumers.containsKey(consumerUserData.generation.get())) { + // same partition is assigned to two consumers during the same rebalance. + // log a warning and skip this record + log.warn("Partition '{}' is assigned to multiple consumers following sticky assignment generation {}.", + partition, consumerUserData.generation); + } else + consumers.put(consumerUserData.generation.orElse(DEFAULT_GENERATION), consumer); + } else { + TreeMap sortedConsumers = new TreeMap<>(); + sortedConsumers.put(consumerUserData.generation.orElse(DEFAULT_GENERATION), consumer); + sortedPartitionConsumersByGeneration.put(partition, sortedConsumers); + } + } + } + + // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition + // current and previous consumers are the last two consumers of each partition in the above sorted map + for (Map.Entry> partitionConsumersEntry: sortedPartitionConsumersByGeneration.entrySet()) { + TopicPartition partition = partitionConsumersEntry.getKey(); + TreeMap consumers = partitionConsumersEntry.getValue(); + Iterator it = consumers.descendingKeySet().iterator(); + + // let's process the current (most recent) consumer first + String consumer = consumers.get(it.next()); + currentAssignment.computeIfAbsent(consumer, k -> new ArrayList<>()); + currentAssignment.get(consumer).add(partition); + + // now update previous assignment if any + if (it.hasNext()) { + int generation = it.next(); + prevAssignment.put(partition, new ConsumerGenerationPair(consumers.get(generation), generation)); + } } } @Override - public void onAssignment(Assignment assignment) { + public void onAssignment(Assignment assignment, int generation) { memberAssignment = assignment.partitions(); + this.generation = generation; } @Override @@ -302,7 +374,8 @@ public class StickyAssignor extends AbstractPartitionAssignor { if (memberAssignment == null) return new Subscription(new ArrayList<>(topics)); - return new Subscription(new ArrayList<>(topics), serializeTopicPartitionAssignment(memberAssignment)); + return new Subscription(new ArrayList<>(topics), + serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation)))); } @Override @@ -310,6 +383,10 @@ public class StickyAssignor extends AbstractPartitionAssignor { return "sticky"; } + int generation() { + return generation; + } + /** * determine if the current assignment is a balanced one * @@ -395,12 +472,16 @@ public class StickyAssignor extends AbstractPartitionAssignor { * that causes minimal partition movement among consumers (hence honoring maximal stickiness) * * @param currentAssignment the calculated assignment so far + * @param partitionsWithADifferentPreviousAssignment partitions that had a different consumer before (for every + * such partition there should also be a mapping in + * @currentAssignment to a different consumer) * @param isFreshAssignment whether this is a new assignment, or a reassignment of an existing one * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from * @return sorted list of valid partitions */ private List sortPartitions(Map> currentAssignment, + Set partitionsWithADifferentPreviousAssignment, boolean isFreshAssignment, Map> partition2AllPotentialConsumers, Map> consumer2AllPotentialPartitions) { @@ -421,11 +502,28 @@ public class StickyAssignor extends AbstractPartitionAssignor { } TreeSet sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments)); sortedConsumers.addAll(assignments.keySet()); + // at this point, sortedConsumers contains an ascending-sorted list of consumers based on + // how many valid partitions are currently assigned to them while (!sortedConsumers.isEmpty()) { + // take the consumer with the most partitions String consumer = sortedConsumers.pollLast(); + // currently assigned partitions to this consumer List remainingPartitions = assignments.get(consumer); - if (!remainingPartitions.isEmpty()) { + // partitions that were assigned to a different consumer last time + List prevPartitions = new ArrayList<>(partitionsWithADifferentPreviousAssignment); + // from partitions that had a different consumer before, keep only those that are + // assigned to this consumer now + prevPartitions.retainAll(remainingPartitions); + if (!prevPartitions.isEmpty()) { + // if there is a partition of this consumer that was assigned to another consumer before + // mark it as good options for reassignment + TopicPartition partition = prevPartitions.remove(0); + remainingPartitions.remove(partition); + sortedPartitions.add(partition); + sortedConsumers.add(consumer); + } else if (!remainingPartitions.isEmpty()) { + // otherwise, mark any other one of the current partitions as a reassignment candidate sortedPartitions.add(remainingPartitions.remove(0)); sortedConsumers.add(consumer); } @@ -459,17 +557,13 @@ public class StickyAssignor extends AbstractPartitionAssignor { if (!hasIdenticalListElements(partition2AllPotentialConsumers.values())) return false; - if (!hasIdenticalListElements(consumer2AllPotentialPartitions.values())) - return false; - - return true; + return hasIdenticalListElements(consumer2AllPotentialPartitions.values()); } /** - * @return the consumer to which the given partition is assigned. The assignment should improve the overall balance - * of the partition assignments to consumers. + * The assignment should improve the overall balance of the partition assignments to consumers. */ - private String assignPartition(TopicPartition partition, + private void assignPartition(TopicPartition partition, TreeSet sortedCurrentSubscriptions, Map> currentAssignment, Map> consumer2AllPotentialPartitions, @@ -480,10 +574,9 @@ public class StickyAssignor extends AbstractPartitionAssignor { currentAssignment.get(consumer).add(partition); currentPartitionConsumer.put(partition, consumer); sortedCurrentSubscriptions.add(consumer); - return consumer; + break; } } - return null; } private boolean canParticipateInReassignment(TopicPartition partition, @@ -519,6 +612,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { * Balance the current assignment using the data structures created in the assign(...) method above. */ private void balance(Map> currentAssignment, + Map prevAssignment, List sortedPartitions, List unassignedPartitions, TreeSet sortedCurrentSubscriptions, @@ -558,7 +652,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { Map> preBalanceAssignment = deepCopy(currentAssignment); Map preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer); - reassignmentPerformed = performReassignments(sortedPartitions, currentAssignment, sortedCurrentSubscriptions, + reassignmentPerformed = performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer); // if we are not preserving existing assignments and we have made changes to the current assignment @@ -581,6 +675,7 @@ public class StickyAssignor extends AbstractPartitionAssignor { private boolean performReassignments(List reassignablePartitions, Map> currentAssignment, + Map prevAssignment, TreeSet sortedCurrentSubscriptions, Map> consumer2AllPotentialPartitions, Map> partition2AllPotentialConsumers, @@ -606,6 +701,14 @@ public class StickyAssignor extends AbstractPartitionAssignor { if (consumer == null) log.error("Expected partition '{}' to be assigned to a consumer", partition); + if (prevAssignment.containsKey(partition) && + currentAssignment.get(consumer).size() > currentAssignment.get(prevAssignment.get(partition).consumer).size() + 1) { + reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment.get(partition).consumer); + reassignmentPerformed = true; + modified = true; + continue; + } + // check if a better-suited consumer exist for the partition; if so, reassign it for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) { if (currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1) { @@ -626,8 +729,6 @@ public class StickyAssignor extends AbstractPartitionAssignor { TreeSet sortedCurrentSubscriptions, Map currentPartitionConsumer, Map> consumer2AllPotentialPartitions) { - String consumer = currentPartitionConsumer.get(partition); - // find the new consumer String newConsumer = null; for (String anotherConsumer: sortedCurrentSubscriptions) { @@ -639,11 +740,18 @@ public class StickyAssignor extends AbstractPartitionAssignor { assert newConsumer != null; + reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, newConsumer); + } + + private void reassignPartition(TopicPartition partition, + Map> currentAssignment, + TreeSet sortedCurrentSubscriptions, + Map currentPartitionConsumer, + String newConsumer) { + String consumer = currentPartitionConsumer.get(partition); // find the correct partition movement considering the stickiness requirement TopicPartition partitionToBeMoved = partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer); processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer); - - return; } private void processPartitionMovement(TopicPartition partition, @@ -669,24 +777,39 @@ public class StickyAssignor extends AbstractPartitionAssignor { return partitionMovements.isSticky(); } - static ByteBuffer serializeTopicPartitionAssignment(List partitions) { - Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA); + static ByteBuffer serializeTopicPartitionAssignment(ConsumerUserData consumerUserData) { + Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); List topicAssignments = new ArrayList<>(); - for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) { + for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(consumerUserData.partitions).entrySet()) { Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); topicAssignments.add(topicAssignment); } struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); - ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA.sizeOf(struct)); - STICKY_ASSIGNOR_USER_DATA.write(buffer, struct); + if (consumerUserData.generation.isPresent()) + struct.set(GENERATION_KEY_NAME, consumerUserData.generation.get()); + ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct)); + STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct); buffer.flip(); return buffer; } - private static List deserializeTopicPartitionAssignment(ByteBuffer buffer) { - Struct struct = STICKY_ASSIGNOR_USER_DATA.read(buffer); + private static ConsumerUserData deserializeTopicPartitionAssignment(ByteBuffer buffer) { + Struct struct; + ByteBuffer copy = buffer.duplicate(); + try { + struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer); + } catch (Exception e1) { + try { + // fall back to older schema + struct = STICKY_ASSIGNOR_USER_DATA_V0.read(copy); + } catch (Exception e2) { + // ignore the consumer's previous assignment if it cannot be parsed + return new ConsumerUserData(Collections.emptyList(), Optional.of(DEFAULT_GENERATION)); + } + } + List partitions = new ArrayList<>(); for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) { Struct assignment = (Struct) structObj; @@ -696,7 +819,9 @@ public class StickyAssignor extends AbstractPartitionAssignor { partitions.add(new TopicPartition(topic, partition)); } } - return partitions; + // make sure this is backward compatible + Optional generation = struct.hasField(GENERATION_KEY_NAME) ? Optional.of(struct.getInt(GENERATION_KEY_NAME)) : Optional.empty(); + return new ConsumerUserData(partitions, generation); } /** @@ -794,11 +919,11 @@ public class StickyAssignor extends AbstractPartitionAssignor { String topic = partition.topic(); if (!partitionMovementsByTopic.containsKey(topic)) - partitionMovementsByTopic.put(topic, new HashMap>()); + partitionMovementsByTopic.put(topic, new HashMap<>()); Map> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic); if (!partitionMovementsForThisTopic.containsKey(pair)) - partitionMovementsForThisTopic.put(pair, new HashSet()); + partitionMovementsForThisTopic.put(pair, new HashSet<>()); partitionMovementsForThisTopic.get(pair).add(partition); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4ff4e19590a..9261966ccbd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -566,7 +566,7 @@ public abstract class AbstractCoordinator implements Closeable { // and send another join group request in next cycle. synchronized (AbstractCoordinator.this) { AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, - joinResponse.data().memberId(), null); + joinResponse.data().memberId(), null); AbstractCoordinator.this.rejoinNeeded = true; AbstractCoordinator.this.state = MemberState.UNJOINED; } @@ -654,7 +654,7 @@ public abstract class AbstractCoordinator implements Closeable { FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId); return client.send(node, requestBuilder) - .compose(new FindCoordinatorResponseHandler()); + .compose(new FindCoordinatorResponseHandler()); } private class FindCoordinatorResponseHandler extends RequestFutureAdapter { @@ -940,8 +940,8 @@ public abstract class AbstractCoordinator implements Closeable { this.heartbeatLatency = metrics.sensor("heartbeat-latency"); this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max", - this.metricGrpName, - "The max time taken to receive a response to a heartbeat request"), new Max()); + this.metricGrpName, + "The max time taken to receive a response to a heartbeat request"), new Max()); this.heartbeatLatency.add(createMeter(metrics, metricGrpName, "heartbeat", "heartbeats")); this.joinLatency = metrics.sensor("join-latency"); @@ -1148,8 +1148,8 @@ public abstract class AbstractCoordinator implements Closeable { if (o == null || getClass() != o.getClass()) return false; final Generation that = (Generation) o; return generationId == that.generationId && - Objects.equals(memberId, that.memberId) && - Objects.equals(protocol, that.protocol); + Objects.equals(memberId, that.memberId) && + Objects.equals(protocol, that.protocol); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 2b949a37deb..b31bf44ae7f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -262,7 +262,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { maybeUpdateJoinedSubscription(assignedPartitions); // give the assignor a chance to update internal state based on the received assignment - assignor.onAssignment(assignment); + assignor.onAssignment(assignment, generation); // reschedule the auto commit starting from now if (autoCommitEnabled) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index 8a4aef8b334..7bef8f7ff74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -45,6 +45,7 @@ import java.util.Map; * TopicPartitions => [Topic Partitions] * Topic => String * Partitions => [int32] + * UserData => Bytes * * * The current implementation assumes that future versions will not break compatibility. When diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java index 4a7c7a8bbd3..43fdaf3b01c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java @@ -58,13 +58,21 @@ public interface PartitionAssignor { */ Map assign(Cluster metadata, Map subscriptions); - /** * Callback which is invoked when a group member receives its assignment from the leader. * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)} */ void onAssignment(Assignment assignment); + /** + * Callback which is invoked when a group member receives its assignment from the leader. + * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)} + * @param generation The consumer group generation associated with this partition assignment (optional) + */ + default void onAssignment(Assignment assignment, int generation) { + onAssignment(assignment); + } + /** * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky") diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index 32ba16a4820..a1fe0cd0e1c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -27,11 +28,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Random; import java.util.Set; +import org.apache.kafka.clients.consumer.StickyAssignor.ConsumerUserData; import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.Utils; import org.junit.Test; @@ -46,7 +50,7 @@ public class StickyAssignorTest { Map partitionsPerTopic = new HashMap<>(); Map subscriptions = - Collections.singletonMap(consumerId, new Subscription(Collections.emptyList())); + Collections.singletonMap(consumerId, new Subscription(Collections.emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); assertEquals(Collections.singleton(consumerId), assignment.keySet()); @@ -235,10 +239,11 @@ public class StickyAssignorTest { String consumer2 = "consumer2"; subscriptions.put(consumer1, - new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1)))); + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get(consumer1), Optional.of(assignor.generation()))))); subscriptions.put(consumer2, new Subscription(topics(topic))); assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(partitions(tp(topic, 1), tp(topic, 2)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2), tp(topic, 1)), assignment.get(consumer1)); assertEquals(partitions(tp(topic, 0)), assignment.get(consumer2)); verifyValidityAndBalance(subscriptions, assignment); @@ -247,7 +252,8 @@ public class StickyAssignorTest { subscriptions.remove(consumer1); subscriptions.put(consumer2, - new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2)))); + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get(consumer2), Optional.of(assignor.generation()))))); assignment = assignor.assign(partitionsPerTopic, subscriptions); assertTrue(assignment.get(consumer2).contains(tp(topic, 0))); assertTrue(assignment.get(consumer2).contains(tp(topic, 1))); @@ -318,9 +324,11 @@ public class StickyAssignorTest { String topic2 = "topic2"; partitionsPerTopic.put(topic2, 3); subscriptions.put(consumer1, - new Subscription(topics(topic, topic2), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1)))); + new Subscription(topics(topic, topic2), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get(consumer1), Optional.of(assignor.generation()))))); subscriptions.put(consumer2, - new Subscription(topics(topic, topic2), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2)))); + new Subscription(topics(topic, topic2), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get(consumer2), Optional.of(assignor.generation()))))); assignment = assignor.assign(partitionsPerTopic, subscriptions); // verify balance verifyValidityAndBalance(subscriptions, assignment); @@ -335,9 +343,11 @@ public class StickyAssignorTest { partitionsPerTopic.remove(topic); subscriptions.put(consumer1, - new Subscription(topics(topic2), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer1)))); + new Subscription(topics(topic2), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get(consumer1), Optional.of(assignor.generation()))))); subscriptions.put(consumer2, - new Subscription(topics(topic2), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer2)))); + new Subscription(topics(topic2), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get(consumer2), Optional.of(assignor.generation()))))); assignment = assignor.assign(partitionsPerTopic, subscriptions); // verify balance verifyValidityAndBalance(subscriptions, assignment); @@ -360,7 +370,7 @@ public class StickyAssignorTest { Map subscriptions = new HashMap<>(); for (int i = 1; i < 20; i++) { - List topics = new ArrayList(); + List topics = new ArrayList<>(); for (int j = 1; j <= i; j++) topics.add(getTopicName(j, 20)); subscriptions.put(getConsumerName(i, 20), new Subscription(topics)); @@ -372,7 +382,8 @@ public class StickyAssignorTest { for (int i = 1; i < 20; i++) { String consumer = getConsumerName(i, 20); subscriptions.put(consumer, - new Subscription(subscriptions.get(consumer).topics(), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + new Subscription(subscriptions.get(consumer).topics(), + StickyAssignor.serializeTopicPartitionAssignment(new ConsumerUserData(assignment.get(consumer), Optional.of(assignor.generation()))))); } subscriptions.remove("consumer10"); @@ -409,7 +420,7 @@ public class StickyAssignorTest { Map subscriptions = new HashMap<>(); for (int i = 1; i < 9; i++) { - List topics = new ArrayList(); + List topics = new ArrayList<>(); for (int j = 1; j <= partitionsPerTopic.size(); j++) topics.add(getTopicName(j, 15)); subscriptions.put(getConsumerName(i, 9), new Subscription(topics)); @@ -421,7 +432,8 @@ public class StickyAssignorTest { for (int i = 1; i < 9; i++) { String consumer = getConsumerName(i, 9); subscriptions.put(consumer, - new Subscription(subscriptions.get(consumer).topics(), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + new Subscription(subscriptions.get(consumer).topics(), + StickyAssignor.serializeTopicPartitionAssignment(new ConsumerUserData(assignment.get(consumer), Optional.of(assignor.generation()))))); } subscriptions.remove(getConsumerName(5, 9)); @@ -442,7 +454,7 @@ public class StickyAssignorTest { Map subscriptions = new HashMap<>(); for (int i = 0; i < consumerCount; i++) { - List topics = new ArrayList(); + List topics = new ArrayList<>(); for (int j = 0; j < rand.nextInt(20); j++) topics.add(getTopicName(rand.nextInt(topicCount), topicCount)); subscriptions.put(getConsumerName(i, consumerCount), new Subscription(topics)); @@ -454,7 +466,8 @@ public class StickyAssignorTest { for (int i = 1; i < consumerCount; i++) { String consumer = getConsumerName(i, consumerCount); subscriptions.put(consumer, - new Subscription(subscriptions.get(consumer).topics(), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + new Subscription(subscriptions.get(consumer).topics(), + StickyAssignor.serializeTopicPartitionAssignment(new ConsumerUserData(assignment.get(consumer), Optional.of(assignor.generation()))))); } for (int i = 0; i < 50; ++i) { String c = getConsumerName(rand.nextInt(consumerCount), consumerCount); @@ -474,7 +487,7 @@ public class StickyAssignorTest { Map subscriptions = new HashMap<>(); for (int i = 0; i < 3; i++) { - List topics = new ArrayList(); + List topics = new ArrayList<>(); for (int j = i; j <= 3 * i - 2; j++) topics.add(getTopicName(j, 5)); subscriptions.put(getConsumerName(i, 3), new Subscription(topics)); @@ -526,7 +539,7 @@ public class StickyAssignorTest { List sub = Utils.sorted(getRandomSublist(topics)); String consumer = getConsumerName(i, maxNumConsumers); subscriptions.put(consumer, - new Subscription(sub, StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + new Subscription(sub, StickyAssignor.serializeTopicPartitionAssignment(new ConsumerUserData(assignment.get(consumer), Optional.of(assignor.generation()))))); } assignment = assignor.assign(partitionsPerTopic, subscriptions); @@ -544,13 +557,16 @@ public class StickyAssignorTest { Map subscriptions = new HashMap<>(); subscriptions.put("consumer01", new Subscription(topics("topic01", "topic02"), - StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic01", 0))))); + StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(partitions(tp("topic01", 0)), Optional.of(assignor.generation()))))); subscriptions.put("consumer02", new Subscription(topics("topic01", "topic02", "topic03", "topic04"), - StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic02", 0), tp("topic03", 0))))); + StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(partitions(tp("topic02", 0), tp("topic03", 0)), Optional.of(assignor.generation()))))); subscriptions.put("consumer03", new Subscription(topics("topic02", "topic03", "topic04", "topic05", "topic06"), - StickyAssignor.serializeTopicPartitionAssignment(partitions(tp("topic04", 0), tp("topic05", 0), tp("topic06", 0))))); + StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(partitions(tp("topic04", 0), tp("topic05", 0), tp("topic06", 0)), Optional.of(assignor.generation()))))); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); @@ -584,13 +600,16 @@ public class StickyAssignorTest { subscriptions.remove("consumer01"); subscriptions.put("consumer02", new Subscription(topics("topic01"), - StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer02")))); + StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get("consumer02"), Optional.of(assignor.generation()))))); subscriptions.put("consumer03", new Subscription(topics("topic01"), - StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer03")))); + StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get("consumer03"), Optional.of(assignor.generation()))))); subscriptions.put("consumer04", new Subscription(topics("topic01"), - StickyAssignor.serializeTopicPartitionAssignment(assignment.get("consumer04")))); + StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get("consumer04"), Optional.of(assignor.generation()))))); assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment); @@ -631,13 +650,265 @@ public class StickyAssignorTest { Map subscriptions = new HashMap<>(); subscriptions.put(consumer, new Subscription(topics(topic))); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); - subscriptions.put(consumer, new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(assignment.get(consumer)))); + subscriptions.put(consumer, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(assignment.get(consumer), Optional.of(1))))); assignment = assignor.assign(Collections.emptyMap(), subscriptions); assertEquals(assignment.size(), 1); assertTrue(assignment.get(consumer).isEmpty()); } + @Test + public void testAssignmentWithMultipleGenerations1() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + String consumer3 = "consumer3"; + + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 6); + Map subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); + subscriptions.put(consumer2, new Subscription(topics(topic))); + subscriptions.put(consumer3, new Subscription(topics(topic))); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + List r1partitions1 = assignment.get(consumer1); + List r1partitions2 = assignment.get(consumer2); + List r1partitions3 = assignment.get(consumer3); + assertTrue(r1partitions1.size() == 2 && r1partitions2.size() == 2 && r1partitions3.size() == 2); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + + subscriptions.put(consumer1, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(r1partitions1, Optional.of(1))))); + subscriptions.put(consumer2, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(r1partitions2, Optional.of(1))))); + subscriptions.remove(consumer3); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + List r2partitions1 = assignment.get(consumer1); + List r2partitions2 = assignment.get(consumer2); + assertTrue(r2partitions1.size() == 3 && r2partitions2.size() == 3); + assertTrue(r2partitions1.containsAll(r1partitions1)); + assertTrue(r2partitions2.containsAll(r1partitions2)); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + + assertTrue(!Collections.disjoint(r2partitions2, r1partitions3)); + subscriptions.remove(consumer1); + subscriptions.put(consumer2, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(r2partitions2, Optional.of(2))))); + subscriptions.put(consumer3, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(r1partitions3, Optional.of(1))))); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + List r3partitions2 = assignment.get(consumer2); + List r3partitions3 = assignment.get(consumer3); + assertTrue(r3partitions2.size() == 3 && r3partitions3.size() == 3); + assertTrue(Collections.disjoint(r3partitions2, r3partitions3)); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + } + + @Test + public void testAssignmentWithMultipleGenerations2() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + String consumer3 = "consumer3"; + + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 6); + Map subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); + subscriptions.put(consumer2, new Subscription(topics(topic))); + subscriptions.put(consumer3, new Subscription(topics(topic))); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + List r1partitions1 = assignment.get(consumer1); + List r1partitions2 = assignment.get(consumer2); + List r1partitions3 = assignment.get(consumer3); + assertTrue(r1partitions1.size() == 2 && r1partitions2.size() == 2 && r1partitions3.size() == 2); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + + subscriptions.remove(consumer1); + subscriptions.put(consumer2, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(r1partitions2, Optional.of(1))))); + subscriptions.remove(consumer3); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + List r2partitions2 = assignment.get(consumer2); + assertEquals(6, r2partitions2.size()); + assertTrue(r2partitions2.containsAll(r1partitions2)); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + + subscriptions.put(consumer1, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(r1partitions1, Optional.of(1))))); + subscriptions.put(consumer2, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(r2partitions2, Optional.of(2))))); + subscriptions.put(consumer3, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(r1partitions3, Optional.of(1))))); + assignment = assignor.assign(partitionsPerTopic, subscriptions); + List r3partitions1 = assignment.get(consumer1); + List r3partitions2 = assignment.get(consumer2); + List r3partitions3 = assignment.get(consumer3); + assertTrue(r3partitions1.size() == 2 && r3partitions2.size() == 2 && r3partitions3.size() == 2); + assertEquals(r1partitions1, r3partitions1); + assertEquals(r1partitions2, r3partitions2); + assertEquals(r1partitions3, r3partitions3); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + } + + @Test + public void testAssignmentWithConflictingPreviousGenerations() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + String consumer3 = "consumer3"; + + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 6); + Map subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); + subscriptions.put(consumer2, new Subscription(topics(topic))); + subscriptions.put(consumer3, new Subscription(topics(topic))); + + TopicPartition tp0 = new TopicPartition(topic, 0); + TopicPartition tp1 = new TopicPartition(topic, 1); + TopicPartition tp2 = new TopicPartition(topic, 2); + TopicPartition tp3 = new TopicPartition(topic, 3); + TopicPartition tp4 = new TopicPartition(topic, 4); + TopicPartition tp5 = new TopicPartition(topic, 5); + + List c1partitions0 = partitions(tp0, tp1, tp4); + List c2partitions0 = partitions(tp0, tp2, tp3); + List c3partitions0 = partitions(tp3, tp4, tp5); + subscriptions.put(consumer1, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(c1partitions0, Optional.of(1))))); + subscriptions.put(consumer2, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(c2partitions0, Optional.of(1))))); + subscriptions.put(consumer3, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(c3partitions0, Optional.of(2))))); + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + List c1partitions = assignment.get(consumer1); + List c2partitions = assignment.get(consumer2); + List c3partitions = assignment.get(consumer3); + + assertTrue(c1partitions.size() == 2 && c2partitions.size() == 2 && c3partitions.size() == 2); + assertTrue(c1partitions0.containsAll(c1partitions)); + assertTrue(c2partitions0.containsAll(c2partitions)); + assertTrue(c3partitions0.containsAll(c3partitions)); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + } + + @Test + public void testSchemaBackwardCompatibility() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + String consumer3 = "consumer3"; + + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + Map subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); + subscriptions.put(consumer2, new Subscription(topics(topic))); + subscriptions.put(consumer3, new Subscription(topics(topic))); + + TopicPartition tp0 = new TopicPartition(topic, 0); + TopicPartition tp1 = new TopicPartition(topic, 1); + TopicPartition tp2 = new TopicPartition(topic, 2); + + List c1partitions0 = partitions(tp0, tp2); + List c2partitions0 = partitions(tp1); + subscriptions.put(consumer1, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(c1partitions0, Optional.of(1))))); + subscriptions.put(consumer2, + new Subscription(topics(topic), serializeTopicPartitionAssignmentToOldSchema(c2partitions0))); + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + List c1partitions = assignment.get(consumer1); + List c2partitions = assignment.get(consumer2); + List c3partitions = assignment.get(consumer3); + + assertTrue(c1partitions.size() == 1 && c2partitions.size() == 1 && c3partitions.size() == 1); + assertTrue(c1partitions0.containsAll(c1partitions)); + assertTrue(c2partitions0.containsAll(c2partitions)); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + } + + @Test + public void testConflictingPreviousAssignments() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + Map subscriptions = new HashMap<>(); + subscriptions.put(consumer1, new Subscription(topics(topic))); + subscriptions.put(consumer2, new Subscription(topics(topic))); + + TopicPartition tp0 = new TopicPartition(topic, 0); + TopicPartition tp1 = new TopicPartition(topic, 1); + + // both c1 and c2 have partition 1 assigned to them in generation 1 + List c1partitions0 = partitions(tp0, tp1); + List c2partitions0 = partitions(tp0, tp1); + subscriptions.put(consumer1, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(c1partitions0, Optional.of(1))))); + subscriptions.put(consumer2, + new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment( + new ConsumerUserData(c2partitions0, Optional.of(1))))); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + List c1partitions = assignment.get(consumer1); + List c2partitions = assignment.get(consumer2); + + assertTrue(c1partitions.size() == 1 && c2partitions.size() == 1); + verifyValidityAndBalance(subscriptions, assignment); + assertTrue(isFullyBalanced(assignment)); + assertTrue(assignor.isSticky()); + } + + private static ByteBuffer serializeTopicPartitionAssignmentToOldSchema(List partitions) { + Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0); + List topicAssignments = new ArrayList<>(); + for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) { + Struct topicAssignment = new Struct(StickyAssignor.TOPIC_ASSIGNMENT); + topicAssignment.set(StickyAssignor.TOPIC_KEY_NAME, topicEntry.getKey()); + topicAssignment.set(StickyAssignor.PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); + topicAssignments.add(topicAssignment); + } + struct.set(StickyAssignor.TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); + ByteBuffer buffer = ByteBuffer.allocate(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct)); + StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct); + buffer.flip(); + return buffer; + } + private String getTopicName(int i, int maxNum) { return getCanonicalName("t", i, maxNum); }