diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 349fc2a4d3d..f2ee21e9da6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -29,9 +29,9 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; import org.apache.kafka.clients.admin.internals.AdminMetadataManager; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ElectionType; @@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient { for (DescribedGroupMember groupMember : members) { Set partitions = Collections.emptySet(); if (groupMember.memberAssignment().length > 0) { - final PartitionAssignor.Assignment assignment = ConsumerProtocol. + final ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol. deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); partitions = new HashSet<>(assignment.partitions()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7eb34d4aeee..8a18bd5a77a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig { * partition.assignment.strategy */ public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy"; - private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used"; + private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name or class type of the assignor implementing the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. A custom assignor that implements ConsumerPartitionAssignor can be plugged in"; /** * auto.offset.reset diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java new file mode 100644 index 00000000000..e17894b213d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import java.util.Optional; + +public class ConsumerGroupMetadata { + private String groupId; + private int generationId; + private String memberId; + Optional groupInstanceId; + + public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional groupInstanceId) { + this.groupId = groupId; + this.generationId = generationId; + this.memberId = memberId; + this.groupInstanceId = groupInstanceId; + } + + public String groupId() { + return groupId; + } + + public int generationId() { + return generationId; + } + + public String memberId() { + return memberId; + } + + public Optional groupInstanceId() { + return groupInstanceId; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java new file mode 100644 index 00000000000..72d5d6e806b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.TopicPartition; + +/** + * This interface is used to define custom partition assignment for use in + * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe + * to the topics they are interested in and forward their subscriptions to a Kafka broker serving + * as the group coordinator. The coordinator selects one member to perform the group assignment and + * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, GroupSubscription)} is called + * to perform the assignment and the results are forwarded back to each respective members + * + * In some cases, it is useful to forward additional metadata to the assignor in order to make + * assignment decisions. For this, you can override {@link #subscriptionUserData(Set)} and provide custom + * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation + * can use this user data to forward the rackId belonging to each member. + */ +public interface ConsumerPartitionAssignor { + + /** + * Return serialized data that will be included in the {@link Subscription} sent to the leader + * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information) + * + * @return optional join subscription user data + */ + default ByteBuffer subscriptionUserData(Set topics) { + return null; + } + + /** + * Perform the group assignment given the member subscriptions and current cluster metadata. + * @param metadata Current topic/broker metadata known by consumer + * @param subscriptions Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)} + * @return A map from the members to their respective assignment. This should have one entry + * for each member in the input subscription map. + */ + GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions); + + /** + * Callback which is invoked when a group member receives its assignment from the leader. + * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)} + * @param metadata Additional metadata on the consumer (optional) + */ + default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + } + + /** + * Indicate which rebalance protocol this assignor works with; + * By default it should always work with {@link RebalanceProtocol#EAGER}. + */ + default List supportedProtocols() { + return Collections.singletonList(RebalanceProtocol.EAGER); + } + + /** + * Return the version of the assignor which indicates how the user metadata encodings + * and the assignment algorithm gets evolved. + */ + default short version() { + return (short) 0; + } + + /** + * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required + * to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG} + * @return non-null unique name + */ + String name(); + + final class Subscription { + private final List topics; + private final ByteBuffer userData; + private final List ownedPartitions; + private Optional groupInstanceId; + + public Subscription(List topics, ByteBuffer userData, List ownedPartitions) { + this.topics = topics; + this.userData = userData; + this.ownedPartitions = ownedPartitions; + this.groupInstanceId = Optional.empty(); + } + + public Subscription(List topics, ByteBuffer userData) { + this(topics, userData, Collections.emptyList()); + } + + public Subscription(List topics) { + this(topics, null, Collections.emptyList()); + } + + public List topics() { + return topics; + } + + public ByteBuffer userData() { + return userData; + } + + public List ownedPartitions() { + return ownedPartitions; + } + + public void setGroupInstanceId(Optional groupInstanceId) { + this.groupInstanceId = groupInstanceId; + } + + public Optional groupInstanceId() { + return groupInstanceId; + } + } + + final class Assignment { + private List partitions; + private ByteBuffer userData; + + public Assignment(List partitions, ByteBuffer userData) { + this.partitions = partitions; + this.userData = userData; + } + + public Assignment(List partitions) { + this(partitions, null); + } + + public List partitions() { + return partitions; + } + + public ByteBuffer userData() { + return userData; + } + } + + final class GroupSubscription { + private final Map subscriptions; + + public GroupSubscription(Map subscriptions) { + this.subscriptions = subscriptions; + } + + public Map groupSubscription() { + return subscriptions; + } + } + + final class GroupAssignment { + private final Map assignments; + + public GroupAssignment(Map assignments) { + this.assignments = assignments; + } + + public Map groupAssignment() { + return assignments; + } + } + + enum RebalanceProtocol { + EAGER((byte) 0), COOPERATIVE((byte) 1); + + private final byte id; + + RebalanceProtocol(byte id) { + this.id = id; + } + + public byte id() { + return id; + } + + public static RebalanceProtocol forId(byte id) { + switch (id) { + case 0: + return EAGER; + case 1: + return COOPERATIVE; + default: + throw new IllegalArgumentException("Unknown rebalance protocol id: " + id); + } + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fa5cc99a188..30944b330da 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -581,7 +580,7 @@ public class KafkaConsumer implements Consumer { private final long requestTimeoutMs; private final int defaultApiTimeoutMs; private volatile boolean closed = false; - private List assignors; + private List assignors; // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access @@ -768,7 +767,7 @@ public class KafkaConsumer implements Consumer { heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation this.assignors = config.getConfiguredInstances( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, - PartitionAssignor.class); + ConsumerPartitionAssignor.class); // no coordinator will be constructed for the default (null) group id this.coordinator = groupId == null ? null : @@ -833,7 +832,7 @@ public class KafkaConsumer implements Consumer { long retryBackoffMs, long requestTimeoutMs, int defaultApiTimeoutMs, - List assignors, + List assignors, String groupId) { this.log = logContext.logger(getClass()); this.clientId = clientId; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index 3c7d010f5f7..3311cd89094 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -365,18 +365,17 @@ public class StickyAssignor extends AbstractPartitionAssignor { } @Override - public void onAssignment(Assignment assignment, int generation) { + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { memberAssignment = assignment.partitions(); - this.generation = generation; + this.generation = metadata.generationId(); } @Override - public Subscription subscription(Set topics) { + public ByteBuffer subscriptionUserData(Set topics) { if (memberAssignment == null) - return new Subscription(new ArrayList<>(topics)); + return null; - return new Subscription(new ArrayList<>(topics), - serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation)))); + return serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation))); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java index 2487daa4eaa..3b966b0736b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -33,7 +34,7 @@ import java.util.Set; * Abstract assignor implementation which does some common grunt work (in particular collecting * partition counts which are always needed in assignors). */ -public abstract class AbstractPartitionAssignor implements PartitionAssignor { +public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssignor { private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class); /** @@ -47,12 +48,8 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { Map subscriptions); @Override - public Subscription subscription(Set topics) { - return new Subscription(new ArrayList<>(topics)); - } - - @Override - public Map assign(Cluster metadata, Map subscriptions) { + public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscriptions) { + Map subscriptions = groupSubscriptions.groupSubscription(); Set allSubscribedTopics = new HashSet<>(); for (Map.Entry subscriptionEntry : subscriptions.entrySet()) allSubscribedTopics.addAll(subscriptionEntry.getValue().topics()); @@ -72,12 +69,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { Map assignments = new HashMap<>(); for (Map.Entry> assignmentEntry : rawAssignments.entrySet()) assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue())); - return assignments; - } - - @Override - public void onAssignment(Assignment assignment) { - // this assignor maintains no internal state, so nothing to do + return new GroupAssignment(assignments); } protected static void put(Map> map, K key, V value) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 4866986d37e..a28119dd4b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -18,13 +18,16 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.RebalanceProtocol; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -78,7 +81,7 @@ import java.util.stream.Collectors; public final class ConsumerCoordinator extends AbstractCoordinator { private final GroupRebalanceConfig rebalanceConfig; private final Logger log; - private final List assignors; + private final List assignors; private final ConsumerMetadata metadata; private final ConsumerCoordinatorMetrics sensors; private final SubscriptionState subscriptions; @@ -128,7 +131,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, LogContext logContext, ConsumerNetworkClient client, - List assignors, + List assignors, ConsumerMetadata metadata, SubscriptionState subscriptions, Metrics metrics, @@ -170,13 +173,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (!assignors.isEmpty()) { List supportedProtocols = new ArrayList<>(assignors.get(0).supportedProtocols()); - for (PartitionAssignor assignor : assignors) { + for (ConsumerPartitionAssignor assignor : assignors) { supportedProtocols.retainAll(assignor.supportedProtocols()); } if (supportedProtocols.isEmpty()) { throw new IllegalArgumentException("Specified assignors " + - assignors.stream().map(PartitionAssignor::name).collect(Collectors.toSet()) + + assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet()) + " do not have commonly supported rebalance protocol"); } @@ -201,8 +204,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.joinedSubscription = subscriptions.subscription(); JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(); - for (PartitionAssignor assignor : assignors) { - Subscription subscription = assignor.subscription(joinedSubscription); + for (ConsumerPartitionAssignor assignor : assignors) { + Subscription subscription = new Subscription(new ArrayList<>(joinedSubscription), + assignor.subscriptionUserData(joinedSubscription), + subscriptions.assignedPartitionsList()); ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription); protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol() @@ -220,8 +225,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { metadata.requestUpdateForNewTopics(); } - private PartitionAssignor lookupAssignor(String name) { - for (PartitionAssignor assignor : this.assignors) { + private ConsumerPartitionAssignor lookupAssignor(String name) { + for (ConsumerPartitionAssignor assignor : this.assignors) { if (assignor.name().equals(name)) return assignor; } @@ -261,7 +266,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (!isLeader) assignmentSnapshot = null; - PartitionAssignor assignor = lookupAssignor(assignmentStrategy); + ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); @@ -285,7 +290,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { maybeUpdateJoinedSubscription(assignedPartitions); // give the assignor a chance to update internal state based on the received assignment - assignor.onAssignment(assignment, generation); + ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId); + assignor.onAssignment(assignment, metadata); // reschedule the auto commit starting from now if (autoCommitEnabled) @@ -314,10 +320,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { case COOPERATIVE: assignAndRevoke(listener, assignedPartitions, ownedPartitions); - if (assignment.error() == ConsumerProtocol.AssignmentError.NEED_REJOIN) { - requestRejoin(); - } - break; } @@ -470,20 +472,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator { protected Map performAssignment(String leaderId, String assignmentStrategy, List allSubscriptions) { - PartitionAssignor assignor = lookupAssignor(assignmentStrategy); + ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); Set allSubscribedTopics = new HashSet<>(); Map subscriptions = new HashMap<>(); - // collect all the owned partitions - Map ownedPartitions = new HashMap<>(); for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) { Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata())); subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId())); subscriptions.put(memberSubscription.memberId(), subscription); allSubscribedTopics.addAll(subscription.topics()); - ownedPartitions.putAll(subscription.ownedPartitions().stream().collect(Collectors.toMap(item -> item, item -> memberSubscription.memberId()))); } // the leader will begin watching for changes to any of the topics the group is interested in, @@ -494,16 +493,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions); - Map assignments = assignor.assign(metadata.fetch(), subscriptions); - - switch (protocol) { - case EAGER: - break; - - case COOPERATIVE: - adjustAssignment(ownedPartitions, assignments); - break; - } + Map assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment(); // user-customized assignor may have created some topics that are not in the subscription list // and assign their partitions to the members; in this case we would like to update the leader's @@ -547,40 +537,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { return groupAssignment; } - private void adjustAssignment(final Map ownedPartitions, - final Map assignments) { - boolean revocationsNeeded = false; - Set assignedPartitions = new HashSet<>(); - for (final Map.Entry entry : assignments.entrySet()) { - final Assignment assignment = entry.getValue(); - assignedPartitions.addAll(assignment.partitions()); - - // update the assignment if the partition is owned by another different owner - List updatedPartitions = assignment.partitions().stream() - .filter(tp -> ownedPartitions.containsKey(tp) && !entry.getKey().equals(ownedPartitions.get(tp))) - .collect(Collectors.toList()); - if (!updatedPartitions.equals(assignment.partitions())) { - assignment.updatePartitions(updatedPartitions); - revocationsNeeded = true; - } - } - - // for all owned but not assigned partitions, blindly add them to assignment - for (final Map.Entry entry : ownedPartitions.entrySet()) { - final TopicPartition tp = entry.getKey(); - if (!assignedPartitions.contains(tp)) { - assignments.get(entry.getValue()).partitions().add(tp); - } - } - - // if revocations are triggered, tell everyone to re-join immediately. - if (revocationsNeeded) { - for (final Assignment assignment : assignments.values()) { - assignment.setError(ConsumerProtocol.AssignmentError.NEED_REJOIN); - } - } - } - @Override protected void onJoinPrepare(int generation, String memberId) { // commit offsets prior to rebalance if auto-commit enabled diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index d05d5b06906..e852e62cd46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.clients.consumer.internals; +import java.util.Collections; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; @@ -49,7 +52,6 @@ import java.util.Map; * Topic => String * Partitions => [int32] * UserData => Bytes - * ErrorCode => [int16] * * * Version 0 format: @@ -85,11 +87,11 @@ public class ConsumerProtocol { public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions"; public static final String USER_DATA_KEY_NAME = "user_data"; - public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Assignment error code"); - public static final short CONSUMER_PROTOCOL_V0 = 0; public static final short CONSUMER_PROTOCOL_V1 = 1; + public static final short CONSUMER_PROTOCL_LATEST_VERSION = CONSUMER_PROTOCOL_V1; + public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema( new Field(VERSION_KEY_NAME, Type.INT16)); private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA) @@ -116,36 +118,9 @@ public class ConsumerProtocol { public static final Schema ASSIGNMENT_V1 = new Schema( new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)), - new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES), - ERROR_CODE); - - public enum AssignmentError { - NONE(0), - NEED_REJOIN(1); + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES)); - private final short code; - - AssignmentError(final int code) { - this.code = (short) code; - } - - public short code() { - return code; - } - - public static AssignmentError fromCode(final short code) { - switch (code) { - case 0: - return NONE; - case 1: - return NEED_REJOIN; - default: - throw new IllegalArgumentException("Unknown error code: " + code); - } - } - } - - public static ByteBuffer serializeSubscriptionV0(PartitionAssignor.Subscription subscription) { + public static ByteBuffer serializeSubscriptionV0(Subscription subscription) { Struct struct = new Struct(SUBSCRIPTION_V0); struct.set(USER_DATA_KEY_NAME, subscription.userData()); struct.set(TOPICS_KEY_NAME, subscription.topics().toArray()); @@ -157,7 +132,7 @@ public class ConsumerProtocol { return buffer; } - public static ByteBuffer serializeSubscriptionV1(PartitionAssignor.Subscription subscription) { + public static ByteBuffer serializeSubscriptionV1(Subscription subscription) { Struct struct = new Struct(SUBSCRIPTION_V1); struct.set(USER_DATA_KEY_NAME, subscription.userData()); struct.set(TOPICS_KEY_NAME, subscription.topics().toArray()); @@ -178,8 +153,12 @@ public class ConsumerProtocol { return buffer; } - public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) { - switch (subscription.version()) { + public static ByteBuffer serializeSubscription(Subscription subscription) { + return serializeSubscription(subscription, CONSUMER_PROTOCL_LATEST_VERSION); + } + + public static ByteBuffer serializeSubscription(Subscription subscription, short version) { + switch (version) { case CONSUMER_PROTOCOL_V0: return serializeSubscriptionV0(subscription); @@ -192,17 +171,17 @@ public class ConsumerProtocol { } } - public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) { + public static Subscription deserializeSubscriptionV0(ByteBuffer buffer) { Struct struct = SUBSCRIPTION_V0.read(buffer); ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); List topics = new ArrayList<>(); for (Object topicObj : struct.getArray(TOPICS_KEY_NAME)) topics.add((String) topicObj); - return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData); + return new Subscription(topics, userData, Collections.emptyList()); } - public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) { + public static Subscription deserializeSubscriptionV1(ByteBuffer buffer) { Struct struct = SUBSCRIPTION_V1.read(buffer); ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); List topics = new ArrayList<>(); @@ -218,10 +197,10 @@ public class ConsumerProtocol { } } - return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions); + return new Subscription(topics, userData, ownedPartitions); } - public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) { + public static Subscription deserializeSubscription(ByteBuffer buffer) { Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); Short version = header.getShort(VERSION_KEY_NAME); @@ -241,7 +220,7 @@ public class ConsumerProtocol { } } - public static ByteBuffer serializeAssignmentV0(PartitionAssignor.Assignment assignment) { + public static ByteBuffer serializeAssignmentV0(Assignment assignment) { Struct struct = new Struct(ASSIGNMENT_V0); struct.set(USER_DATA_KEY_NAME, assignment.userData()); List topicAssignments = new ArrayList<>(); @@ -261,7 +240,7 @@ public class ConsumerProtocol { return buffer; } - public static ByteBuffer serializeAssignmentV1(PartitionAssignor.Assignment assignment) { + public static ByteBuffer serializeAssignmentV1(Assignment assignment) { Struct struct = new Struct(ASSIGNMENT_V1); struct.set(USER_DATA_KEY_NAME, assignment.userData()); List topicAssignments = new ArrayList<>(); @@ -273,7 +252,6 @@ public class ConsumerProtocol { topicAssignments.add(topicAssignment); } struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); - struct.set(ERROR_CODE, assignment.error().code); ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + ASSIGNMENT_V1.sizeOf(struct)); CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer); @@ -282,8 +260,12 @@ public class ConsumerProtocol { return buffer; } - public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment assignment) { - switch (assignment.version()) { + public static ByteBuffer serializeAssignment(Assignment assignment) { + return serializeAssignment(assignment, CONSUMER_PROTOCL_LATEST_VERSION); + } + + public static ByteBuffer serializeAssignment(Assignment assignment, short version) { + switch (version) { case CONSUMER_PROTOCOL_V0: return serializeAssignmentV0(assignment); @@ -296,7 +278,7 @@ public class ConsumerProtocol { } } - public static PartitionAssignor.Assignment deserializeAssignmentV0(ByteBuffer buffer) { + public static Assignment deserializeAssignmentV0(ByteBuffer buffer) { Struct struct = ASSIGNMENT_V0.read(buffer); ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); List partitions = new ArrayList<>(); @@ -307,27 +289,14 @@ public class ConsumerProtocol { partitions.add(new TopicPartition(topic, (Integer) partitionObj)); } } - return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V0, partitions, userData); + return new Assignment(partitions, userData); } - public static PartitionAssignor.Assignment deserializeAssignmentV1(ByteBuffer buffer) { - Struct struct = ASSIGNMENT_V1.read(buffer); - ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); - List partitions = new ArrayList<>(); - for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) { - Struct assignment = (Struct) structObj; - String topic = assignment.getString(TOPIC_KEY_NAME); - for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) { - partitions.add(new TopicPartition(topic, (Integer) partitionObj)); - } - } - - AssignmentError error = AssignmentError.fromCode(struct.get(ERROR_CODE)); - - return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V1, partitions, userData, error); + public static Assignment deserializeAssignmentV1(ByteBuffer buffer) { + return deserializeAssignmentV0(buffer); } - public static PartitionAssignor.Assignment deserializeAssignment(ByteBuffer buffer) { + public static Assignment deserializeAssignment(ByteBuffer buffer) { Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); Short version = header.getShort(VERSION_KEY_NAME); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java index c26f68462ea..b3f2ada5c41 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java @@ -18,18 +18,12 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.types.SchemaException; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0; -import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V1; - /** * This interface is used to define custom partition assignment for use in * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe @@ -43,6 +37,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSU * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation * can use this user data to forward the rackId belonging to each member. */ +@Deprecated public interface PartitionAssignor { /** @@ -79,21 +74,6 @@ public interface PartitionAssignor { onAssignment(assignment); } - /** - * Indicate which rebalance protocol this assignor works with; - * By default it should always work with {@link RebalanceProtocol#EAGER}. - */ - default List supportedProtocols() { - return Collections.singletonList(RebalanceProtocol.EAGER); - } - - /** - * Return the version of the assignor which indicates how the user metadata encodings - * and the assignment algorithm gets evolved. - */ - default short version() { - return (short) 0; - } /** * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky") @@ -101,156 +81,52 @@ public interface PartitionAssignor { */ String name(); - enum RebalanceProtocol { - EAGER((byte) 0), COOPERATIVE((byte) 1); - - private final byte id; - - RebalanceProtocol(byte id) { - this.id = id; - } - - public byte id() { - return id; - } - - public static RebalanceProtocol forId(byte id) { - switch (id) { - case 0: - return EAGER; - case 1: - return COOPERATIVE; - default: - throw new IllegalArgumentException("Unknown rebalance protocol id: " + id); - } - } - } - class Subscription { - private final Short version; private final List topics; private final ByteBuffer userData; - private final List ownedPartitions; - private Optional groupInstanceId; - Subscription(Short version, - List topics, - ByteBuffer userData, - List ownedPartitions) { - this.version = version; + public Subscription(List topics, ByteBuffer userData) { this.topics = topics; this.userData = userData; - this.ownedPartitions = ownedPartitions; - this.groupInstanceId = Optional.empty(); - - if (version < CONSUMER_PROTOCOL_V0) - throw new SchemaException("Unsupported subscription version: " + version); - - if (version < CONSUMER_PROTOCOL_V1 && !ownedPartitions.isEmpty()) - throw new IllegalArgumentException("Subscription version smaller than 1 should not have owned partitions"); - } - - Subscription(Short version, List topics, ByteBuffer userData) { - this(version, topics, userData, Collections.emptyList()); - } - - public Subscription(List topics, ByteBuffer userData, List ownedPartitions) { - this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions); - } - - public Subscription(List topics, ByteBuffer userData) { - this(CONSUMER_PROTOCOL_V1, topics, userData); } public Subscription(List topics) { this(topics, ByteBuffer.wrap(new byte[0])); } - Short version() { - return version; - } - public List topics() { return topics; } - public List ownedPartitions() { - return ownedPartitions; - } - public ByteBuffer userData() { return userData; } - public void setGroupInstanceId(Optional groupInstanceId) { - this.groupInstanceId = groupInstanceId; - } - - public Optional groupInstanceId() { - return groupInstanceId; - } - @Override public String toString() { return "Subscription(" + - "version=" + version + - ", topics=" + topics + - ", ownedPartitions=" + ownedPartitions + - ", group.instance.id=" + groupInstanceId + ")"; + "topics=" + topics + + ')'; } } class Assignment { - private final Short version; - private List partitions; + private final List partitions; private final ByteBuffer userData; - private ConsumerProtocol.AssignmentError error; - Assignment(Short version, List partitions, ByteBuffer userData, ConsumerProtocol.AssignmentError error) { - this.version = version; + public Assignment(List partitions, ByteBuffer userData) { this.partitions = partitions; this.userData = userData; - this.error = error; - - if (version < CONSUMER_PROTOCOL_V0) - throw new SchemaException("Unsupported subscription version: " + version); - - if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.AssignmentError.NONE) - throw new IllegalArgumentException("Assignment version smaller than 1 should not have error code."); - } - - Assignment(Short version, List partitions, ByteBuffer userData) { - this(version, partitions, userData, ConsumerProtocol.AssignmentError.NONE); - } - - public Assignment(List partitions, ByteBuffer userData) { - this(CONSUMER_PROTOCOL_V1, partitions, userData); } public Assignment(List partitions) { this(partitions, ByteBuffer.wrap(new byte[0])); } - Short version() { - return version; - } - public List partitions() { return partitions; } - public ConsumerProtocol.AssignmentError error() { - return error; - } - - public void updatePartitions(List partitions) { - this.partitions = partitions; - } - - public void setError(ConsumerProtocol.AssignmentError error) { - this.error = error; - } - public ByteBuffer userData() { return userData; } @@ -258,10 +134,8 @@ public interface PartitionAssignor { @Override public String toString() { return "Assignment(" + - "version=" + version + - ", partitions=" + partitions + - ", error=" + error + - ')'; + "partitions=" + partitions + + ')'; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 3f1cf98be9e..af834ce71b1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import java.util.ArrayList; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; @@ -387,6 +388,13 @@ public class SubscriptionState { return new HashSet<>(this.assignment.partitionSet()); } + /** + * @return a modifiable copy of the currently assigned partitions as a list + */ + public synchronized List assignedPartitionsList() { + return new ArrayList<>(this.assignment.partitionSet()); + } + /** * Provides the number of assigned partitions in a thread safe manner. * @return the number of assigned partitions. diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 711c8f92de8..769f58c3caf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -21,9 +21,9 @@ import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.KafkaException; @@ -1222,7 +1222,7 @@ public class KafkaAdminClientTest { topicPartitions.add(1, myTopicPartition1); topicPartitions.add(2, myTopicPartition2); - final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions)); + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; memberAssignment.get(memberAssignmentBytes); @@ -1282,7 +1282,7 @@ public class KafkaAdminClientTest { topicPartitions.add(1, myTopicPartition1); topicPartitions.add(2, myTopicPartition2); - final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions)); + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; memberAssignment.get(memberAssignmentBytes); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index c1adf1932ec..1227c27ad60 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.Fetcher; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -396,7 +395,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -430,7 +429,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -465,7 +464,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final PartitionAssignor assignor = new RoundRobinAssignor(); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -489,7 +488,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final PartitionAssignor assignor = new RoundRobinAssignor(); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -512,7 +511,7 @@ public class KafkaConsumerTest { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singleton(tp0)); @@ -587,7 +586,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -611,7 +610,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -636,7 +635,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId); @@ -663,7 +662,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, Optional.empty()); @@ -686,7 +685,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -724,7 +723,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -764,7 +763,7 @@ public class KafkaConsumerTest { initMetadata(client, partitionCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); @@ -782,7 +781,7 @@ public class KafkaConsumerTest { @Test public void testChangingRegexSubscription() { - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); String otherTopic = "other"; TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0); @@ -828,7 +827,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -878,7 +877,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final PartitionAssignor assignor = new RoundRobinAssignor(); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -908,7 +907,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer)); @@ -948,7 +947,7 @@ public class KafkaConsumerTest { initMetadata(client, tpCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -1062,7 +1061,7 @@ public class KafkaConsumerTest { initMetadata(client, tpCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); @@ -1124,7 +1123,7 @@ public class KafkaConsumerTest { initMetadata(client, tpCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -1180,7 +1179,7 @@ public class KafkaConsumerTest { initMetadata(client, tpCounts); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); @@ -1234,7 +1233,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -1429,7 +1428,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -1457,7 +1456,7 @@ public class KafkaConsumerTest { coordinator); // join group - final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic))); + final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(singletonList(topic))); // This member becomes the leader String memberId = "memberId"; @@ -1512,7 +1511,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty()); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -1649,7 +1648,7 @@ public class KafkaConsumerTest { initMetadata(client, singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - PartitionAssignor assignor = new RangeAssignor(); + ConsumerPartitionAssignor assignor = new RangeAssignor(); client.createPendingAuthenticationError(node, 0); return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); @@ -1675,7 +1674,7 @@ public class KafkaConsumerTest { subscription, new LogContext(), new ClusterResourceListeners()); } - private Node prepareRebalance(MockClient client, Node node, final Set subscribedTopics, PartitionAssignor assignor, List partitions, Node coordinator) { + private Node prepareRebalance(MockClient client, Node node, final Set subscribedTopics, ConsumerPartitionAssignor assignor, List partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); @@ -1692,7 +1691,7 @@ public class KafkaConsumerTest { assertTrue(protocolIterator.hasNext()); ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata()); - PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata); return subscribedTopics.equals(new HashSet<>(subscription.topics())); } }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator); @@ -1703,7 +1702,7 @@ public class KafkaConsumerTest { return coordinator; } - private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List partitions, Node coordinator) { + private Node prepareRebalance(MockClient client, Node node, ConsumerPartitionAssignor assignor, List partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); @@ -1764,7 +1763,7 @@ public class KafkaConsumerTest { return new OffsetCommitResponse(responseData); } - private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) { + private JoinGroupResponse joinGroupFollowerResponse(ConsumerPartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) { return new JoinGroupResponse( new JoinGroupResponseData() .setErrorCode(error.code()) @@ -1777,7 +1776,7 @@ public class KafkaConsumerTest { } private SyncGroupResponse syncGroupResponse(List partitions, Errors error) { - ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); + ByteBuffer buf = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions)); return new SyncGroupResponse( new SyncGroupResponseData() .setErrorCode(error.code()) @@ -1848,7 +1847,7 @@ public class KafkaConsumerTest { KafkaClient client, SubscriptionState subscription, ConsumerMetadata metadata, - PartitionAssignor assignor, + ConsumerPartitionAssignor assignor, boolean autoCommitEnabled, Optional groupInstanceId) { return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId); @@ -1865,7 +1864,7 @@ public class KafkaConsumerTest { KafkaClient client, SubscriptionState subscription, ConsumerMetadata metadata, - PartitionAssignor assignor, + ConsumerPartitionAssignor assignor, boolean autoCommitEnabled, String groupId, Optional groupInstanceId) { @@ -1885,7 +1884,7 @@ public class KafkaConsumerTest { Deserializer keyDeserializer = new StringDeserializer(); Deserializer valueDeserializer = new StringDeserializer(); - List assignors = singletonList(assignor); + List assignors = singletonList(assignor); ConsumerInterceptors interceptors = new ConsumerInterceptors<>(Collections.emptyList()); Metrics metrics = new Metrics(); @@ -1985,7 +1984,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 1)); Cluster cluster = metadata.fetch(); - PartitionAssignor assignor = new RoundRobinAssignor(); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); String invalidTopicName = "topic abc"; // Invalid topic name due to space diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java index f08ca144bf6..118e60a3d12 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.consumer; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.junit.Test; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java index fa6840649a1..02fb9ffba40 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.junit.Test; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index 89f0d37ff0c..6dd30620000 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -30,7 +30,7 @@ import java.util.Random; import java.util.Set; import org.apache.kafka.clients.consumer.StickyAssignor.ConsumerUserData; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index a0878fb8899..a81e73e022d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; @@ -125,9 +126,9 @@ public class ConsumerCoordinatorTest { private final MockTime time = new MockTime(); private GroupRebalanceConfig rebalanceConfig; - private final PartitionAssignor.RebalanceProtocol protocol; + private final ConsumerPartitionAssignor.RebalanceProtocol protocol; private final MockPartitionAssignor partitionAssignor; - private final List assignors; + private final List assignors; private MockClient client; private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap() { { @@ -144,7 +145,7 @@ public class ConsumerCoordinatorTest { private MockCommitCallback mockOffsetCommitCallback; private ConsumerCoordinator coordinator; - public ConsumerCoordinatorTest(final PartitionAssignor.RebalanceProtocol protocol) { + public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) { this.protocol = protocol; this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol)); this.assignors = Collections.singletonList(partitionAssignor); @@ -153,7 +154,7 @@ public class ConsumerCoordinatorTest { @Parameterized.Parameters(name = "rebalance protocol = {0}") public static Collection data() { final List values = new ArrayList<>(); - for (final PartitionAssignor.RebalanceProtocol protocol: PartitionAssignor.RebalanceProtocol.values()) { + for (final ConsumerPartitionAssignor.RebalanceProtocol protocol: ConsumerPartitionAssignor.RebalanceProtocol.values()) { values.add(new Object[]{protocol}); } return values; @@ -198,20 +199,20 @@ public class ConsumerCoordinatorTest { @Test public void testSelectRebalanceProtcol() { - List assignors = new ArrayList<>(); - assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.EAGER))); - assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.COOPERATIVE))); + List assignors = new ArrayList<>(); + assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER))); + assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE))); // no commonly supported protocols assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)); assignors.clear(); - assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE))); - assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE))); + assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE))); + assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE))); // select higher indexed (more advanced) protocols try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) { - assertEquals(PartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol()); + assertEquals(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol()); } } @@ -553,7 +554,7 @@ public class ConsumerCoordinatorTest { final int addCount = 1; // with eager protocol we will call revoke on the old assignment as well - if (protocol == PartitionAssignor.RebalanceProtocol.EAGER) { + if (protocol == ConsumerPartitionAssignor.RebalanceProtocol.EAGER) { revokeCount += 1; } @@ -670,7 +671,7 @@ public class ConsumerCoordinatorTest { JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next(); ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata()); - PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata); metadata.rewind(); return subscription.topics().containsAll(updatedSubscription); } @@ -2326,7 +2327,7 @@ public class ConsumerCoordinatorTest { private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig, final Metrics metrics, - final List assignors, + final List assignors, final boolean autoCommitEnabled) { return new ConsumerCoordinator( rebalanceConfig, @@ -2385,7 +2386,7 @@ public class ConsumerCoordinatorTest { Errors error) { List metadata = new ArrayList<>(); for (Map.Entry> subscriptionEntry : subscriptions.entrySet()) { - PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue()); + ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue()); ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription); metadata.add(new JoinGroupResponseData.JoinGroupResponseMember() .setMemberId(subscriptionEntry.getKey()) @@ -2416,7 +2417,7 @@ public class ConsumerCoordinatorTest { } private SyncGroupResponse syncGroupResponse(List partitions, Errors error) { - ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); + ByteBuffer buf = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions)); return new SyncGroupResponse( new SyncGroupResponseData() .setErrorCode(error.code()) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java index 9e601b034e5..3cf7be51665 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; @@ -39,7 +39,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME; import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_DATA_KEY_NAME; import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; import static org.apache.kafka.test.TestUtils.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -54,7 +53,7 @@ public class ConsumerProtocolTest { @Test public void serializeDeserializeMetadata() { - Subscription subscription = new Subscription(Arrays.asList("foo", "bar")); + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0])); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); assertEquals(subscription.topics(), parsedSubscription.topics()); @@ -64,7 +63,7 @@ public class ConsumerProtocolTest { @Test public void serializeDeserializeMetadataAndGroupInstanceId() { - Subscription subscription = new Subscription(Arrays.asList("foo", "bar")); + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0])); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); @@ -85,8 +84,8 @@ public class ConsumerProtocolTest { @Test public void deserializeOldSubscriptionVersion() { - Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null); - ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null); + ByteBuffer buffer = ConsumerProtocol.serializeSubscriptionV0(subscription); Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); assertEquals(parsedSubscription.topics(), parsedSubscription.topics()); assertNull(parsedSubscription.userData()); @@ -95,7 +94,7 @@ public class ConsumerProtocolTest { @Test public void deserializeNewSubscriptionWithOldVersion() { - Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2)); + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2)); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); // ignore the version assuming it is the old byte code, as it will blindly deserialize as V0 Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); @@ -145,7 +144,7 @@ public class ConsumerProtocolTest { @Test public void serializeDeserializeAssignment() { List partitions = Arrays.asList(tp1, tp2); - ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment(partitions)); + ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment(partitions, ByteBuffer.wrap(new byte[0]))); Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer); assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); assertEquals(0, parsedAssignment.userData().limit()); @@ -160,29 +159,6 @@ public class ConsumerProtocolTest { assertNull(parsedAssignment.userData()); } - @Test - public void deserializeOldAssignmentVersion() { - List partitions = Arrays.asList(tp1, tp2); - ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 0, partitions, null)); - Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer); - assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); - assertNull(parsedAssignment.userData()); - assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error()); - } - - @Test - public void deserializeNewAssignmentWithOldVersion() { - List partitions = Collections.singletonList(tp1); - ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 1, partitions, null, ConsumerProtocol.AssignmentError.NEED_REJOIN)); - // ignore the version assuming it is the old byte code, as it will blindly deserialize as 0 - Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); - header.getShort(VERSION_KEY_NAME); - Assignment parsedAssignment = ConsumerProtocol.deserializeAssignmentV0(buffer); - assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); - assertNull(parsedAssignment.userData()); - assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error()); - } - @Test public void deserializeFutureAssignmentVersion() { // verify that a new version which adds a field is still parseable @@ -191,7 +167,6 @@ public class ConsumerProtocolTest { Schema assignmentSchemaV100 = new Schema( new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)), new Field(USER_DATA_KEY_NAME, Type.BYTES), - ERROR_CODE, new Field("foo", Type.STRING)); Struct assignmentV100 = new Struct(assignmentSchemaV100); @@ -200,7 +175,6 @@ public class ConsumerProtocolTest { .set(ConsumerProtocol.TOPIC_KEY_NAME, tp1.topic()) .set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{tp1.partition()})}); assignmentV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0])); - assignmentV100.set(ERROR_CODE.name, ConsumerProtocol.AssignmentError.NEED_REJOIN.code()); assignmentV100.set("foo", "bar"); Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA); @@ -212,8 +186,7 @@ public class ConsumerProtocolTest { buffer.flip(); - PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer); + Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer); assertEquals(toSet(Collections.singletonList(tp1)), toSet(assignment.partitions())); - assertEquals(ConsumerProtocol.AssignmentError.NEED_REJOIN, assignment.error()); } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 6761b0cad94..4e067166779 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -31,8 +31,8 @@ import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManag import kafka.server.HostedPartition import kafka.utils.{KafkaScheduler, MockTime, TestUtils} import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription import org.apache.kafka.clients.consumer.internals.ConsumerProtocol -import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 714bb0ea210..fa5f5115d65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.streams.processor.internals; +import java.nio.ByteBuffer; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; @@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; -public class StreamsPartitionAssignor implements PartitionAssignor, Configurable { +public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable { final static int UNKNOWN = -1; private final static int VERSION_ONE = 1; @@ -309,7 +311,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable } @Override - public Subscription subscription(final Set topics) { + public ByteBuffer subscriptionUserData(final Set topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks @@ -327,7 +329,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable taskManager.updateSubscriptionsFromMetadata(topics); - return new Subscription(new ArrayList<>(topics), data.encode()); + return data.encode(); } private Map errorAssignment(final Map clientsMetadata, @@ -371,8 +373,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable * 3. within each client, tasks are assigned to consumer clients in round-robin manner. */ @Override - public Map assign(final Cluster metadata, - final Map subscriptions) { + public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscriptions) { + final Map subscriptions = groupSubscriptions.groupSubscription(); // construct the client metadata from the decoded subscription info final Map clientMetadataMap = new HashMap<>(); final Set futureConsumers = new HashSet<>(); @@ -446,7 +448,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable !metadata.topics().contains(topic)) { log.error("Missing source topic {} durign assignment. Returning error {}.", topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name()); - return errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code); + return new GroupAssignment(errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code)); } } for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { @@ -644,7 +646,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable assignment = computeNewAssignment(clientMetadataMap, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion); } - return assignment; + return new GroupAssignment(assignment); } private Map computeNewAssignment(final Map clientsMetadata, @@ -777,7 +779,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable * @throws TaskAssignmentException if there is no task id for one of the partitions specified */ @Override - public void onAssignment(final Assignment assignment) { + public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) { final List partitions = new ArrayList<>(assignment.partitions()); partitions.sort(PARTITION_COMPARATOR); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 5fa6653ba3a..616deaf3904 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -16,7 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -146,7 +147,7 @@ public class StreamsPartitionAssignorTest { EasyMock.replay(taskManager); } - private Map subscriptions; + private Map subscriptions; @Before public void setUp() { @@ -200,7 +201,9 @@ public class StreamsPartitionAssignorTest { mockTaskManager(prevTasks, cachedTasks, processId, builder); configurePartitionAssignor(Collections.emptyMap()); - final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); + + final Set topics = Utils.mkSet("topic1", "topic2"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); Collections.sort(subscription.topics()); assertEquals(asList("topic1", "topic2"), subscription.topics()); @@ -236,16 +239,16 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode())); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), @@ -320,13 +323,13 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode())); - final Map assignments = partitionAssignor.assign(localMetadata, subscriptions); + final Map assignments = partitionAssignor.assign(localMetadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), Utils.mkSet(t1p1, t2p1, t1p3, t2p3)), @@ -365,10 +368,10 @@ public class StreamsPartitionAssignorTest { // will throw exception if it fails subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode() )); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assignment info final AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); @@ -399,12 +402,12 @@ public class StreamsPartitionAssignorTest { configurePartitionAssignor(Collections.emptyMap()); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode() )); // initially metadata is empty - Map assignments = partitionAssignor.assign(emptyMetadata, subscriptions); + Map assignments = partitionAssignor.assign(emptyMetadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions assertEquals(Collections.emptySet(), @@ -417,7 +420,7 @@ public class StreamsPartitionAssignorTest { assertEquals(0, allActiveTasks.size()); // then metadata gets populated - assignments = partitionAssignor.assign(metadata, subscriptions); + assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)), Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()))); @@ -455,16 +458,16 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, emptyTasks, userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, emptyTasks, userEndPoint).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, emptyTasks, userEndPoint).encode())); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and @@ -521,16 +524,16 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, emptyTasks, emptyTasks, userEndPoint).encode())); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match assertEquals(2, assignments.get("consumer10").partitions().size()); @@ -609,16 +612,16 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode())); subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode())); subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode())); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // the first consumer final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); @@ -666,7 +669,7 @@ public class StreamsPartitionAssignorTest { standbyTasks.put(task2, Utils.mkSet(t3p2)); final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState); - final PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode()); + final ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode()); final Capture capturedCluster = EasyMock.newCapture(); taskManager.setPartitionsByHostState(hostState); @@ -677,7 +680,7 @@ public class StreamsPartitionAssignorTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager); - partitionAssignor.onAssignment(assignment); + partitionAssignor.onAssignment(assignment, null); EasyMock.verify(taskManager); @@ -704,10 +707,10 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(internalTopicManager); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()) ); - partitionAssignor.assign(metadata, subscriptions); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check prepared internal topics assertEquals(1, internalTopicManager.readyTopics.size()); @@ -738,10 +741,10 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(internalTopicManager); subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()) ); - partitionAssignor.assign(metadata, subscriptions); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // check prepared internal topics assertEquals(2, internalTopicManager.readyTopics.size()); @@ -790,11 +793,11 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(mockInternalTopicManager); subscriptions.put(client, - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( asList("topic1", "topic3"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()) ); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); final Map expectedCreatedInternalTopics = new HashMap<>(); expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4); @@ -841,7 +844,8 @@ public class StreamsPartitionAssignorTest { uuid1, builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint)); - final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); + final Set topics = Utils.mkSet("input"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData()); assertEquals("localhost:8080", subscriptionInfo.userEndPoint()); } @@ -863,11 +867,11 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer)); subscriptions.put("consumer1", - new PartitionAssignor.Subscription(topics, + new ConsumerPartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()) ); - final Map assignments = partitionAssignor.assign(metadata, subscriptions); - final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); + final Map assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + final ConsumerPartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); final Set topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); assertEquals( @@ -961,11 +965,11 @@ public class StreamsPartitionAssignorTest { partitionAssignor.setInternalTopicManager(mockInternalTopicManager); subscriptions.put(client, - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("unknownTopic"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()) ); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true)); @@ -985,7 +989,7 @@ public class StreamsPartitionAssignorTest { EasyMock.expectLastCall(); EasyMock.replay(taskManager); - partitionAssignor.onAssignment(createAssignment(hostState)); + partitionAssignor.onAssignment(createAssignment(hostState), null); EasyMock.verify(taskManager); } @@ -1015,18 +1019,18 @@ public class StreamsPartitionAssignorTest { mockClientSupplier.restoreConsumer)); subscriptions.put("consumer1", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()) ); subscriptions.put("consumer2", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()) ); final Set allPartitions = Utils.mkSet(t1p0, t1p1, t1p2); - final Map assign = partitionAssignor.assign(metadata, subscriptions); - final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); + final Map assign = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); + final ConsumerPartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData()); final Set consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080)); final Set consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090)); @@ -1109,12 +1113,12 @@ public class StreamsPartitionAssignorTest { private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion, final int otherVersion) { subscriptions.put("consumer1", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()) ); subscriptions.put("consumer2", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() ) @@ -1126,7 +1130,7 @@ public class StreamsPartitionAssignorTest { UUID.randomUUID(), builder); partitionAssignor.configure(configProps()); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(assignment.size(), equalTo(2)); assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion)); @@ -1142,7 +1146,8 @@ public class StreamsPartitionAssignorTest { builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100)); - final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + final Set topics = Utils.mkSet("topic1"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); } @@ -1180,7 +1185,8 @@ public class StreamsPartitionAssignorTest { builder); configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue)); - final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + final Set topics = Utils.mkSet("topic1"); + final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); } @@ -1200,12 +1206,12 @@ public class StreamsPartitionAssignorTest { }; subscriptions.put("consumer1", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode()) ); subscriptions.put("future-consumer", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), encodeFutureSubscription()) ); @@ -1216,7 +1222,7 @@ public class StreamsPartitionAssignorTest { UUID.randomUUID(), builder); partitionAssignor.configure(configProps()); - final Map assignment = partitionAssignor.assign(metadata, subscriptions); + final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); assertThat(assignment.size(), equalTo(2)); assertThat( @@ -1252,12 +1258,12 @@ public class StreamsPartitionAssignorTest { private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) { subscriptions.put("consumer1", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()) ); subscriptions.put("future-consumer", - new PartitionAssignor.Subscription( + new ConsumerPartitionAssignor.Subscription( Collections.singletonList("topic1"), encodeFutureSubscription()) ); @@ -1270,24 +1276,24 @@ public class StreamsPartitionAssignorTest { partitionAssignor.configure(configProps()); try { - partitionAssignor.assign(metadata, subscriptions); + partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); fail("Should have thrown IllegalStateException"); } catch (final IllegalStateException expected) { // pass } } - private PartitionAssignor.Assignment createAssignment(final Map> firstHostState) { + private ConsumerPartitionAssignor.Assignment createAssignment(final Map> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.emptyMap(), firstHostState); - return new PartitionAssignor.Assignment( + return new ConsumerPartitionAssignor.Assignment( Collections.emptyList(), info.encode()); } private AssignmentInfo checkAssignment(final Set expectedTopics, - final PartitionAssignor.Assignment assignment) { + final ConsumerPartitionAssignor.Assignment assignment) { // This assumed 1) DefaultPartitionGrouper is used, and 2) there is an only one topic group. diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 27bee810d85..0b2d0b319aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -18,8 +18,9 @@ package org.apache.kafka.streams.tests; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -113,7 +114,7 @@ public class StreamsUpgradeTest { } @Override - public Subscription subscription(final Set topics) { + public ByteBuffer subscriptionUserData(final Set topics) { // Adds the following information to subscription // 1. Client UUID (a unique id assigned to an instance of KafkaStreams) // 2. Task ids of previously running tasks @@ -133,13 +134,13 @@ public class StreamsUpgradeTest { taskManager.updateSubscriptionsFromMetadata(topics); - return new Subscription(new ArrayList<>(topics), data.encode()); + return data.encode(); } @Override - public void onAssignment(final PartitionAssignor.Assignment assignment) { + public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, final ConsumerGroupMetadata metadata) { try { - super.onAssignment(assignment); + super.onAssignment(assignment, metadata); return; } catch (final TaskAssignmentException cannotProcessFutureVersion) { // continue @@ -183,15 +184,15 @@ public class StreamsUpgradeTest { } @Override - public Map assign(final Cluster metadata, - final Map subscriptions) { + public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription) { + final Map subscriptions = groupSubscription.groupSubscription(); Map assignment = null; final Map downgradedSubscriptions = new HashMap<>(); for (final Subscription subscription : subscriptions.values()) { final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); if (info.version() < SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1) { - assignment = super.assign(metadata, subscriptions); + assignment = super.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); break; } } @@ -219,7 +220,7 @@ public class StreamsUpgradeTest { info.userEndPoint()) .encode())); } - assignment = super.assign(metadata, downgradedSubscriptions); + assignment = super.assign(metadata, new GroupSubscription(downgradedSubscriptions)).groupAssignment(); bumpUsedVersion = true; bumpSupportedVersion = true; } @@ -238,7 +239,7 @@ public class StreamsUpgradeTest { .encode())); } - return newAssignment; + return new GroupAssignment(newAssignment); } }