Browse Source

KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108)

Main changes of this PR:

* Deprecate old consumer.internal.PartitionAssignor and add public consumer.ConsumerPartitionAssignor with all OOTB assignors migrated to new interface
* Refactor assignor's assignment/subscription related classes for easier to evolve API
* Removed version number from classes as it is only needed for serialization/deserialization
* Other previously-discussed cleanup included in this PR:

* Remove Assignment.error added in pt 1
* Remove ConsumerCoordinator#adjustAssignment added in pt 2

Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/7119/head
A. Sophie Blee-Goldman 5 years ago committed by Guozhang Wang
parent
commit
69d86a197f
  1. 4
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  2. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  3. 50
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
  4. 206
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
  5. 7
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  6. 11
      clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
  7. 18
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
  8. 86
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
  9. 95
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
  10. 142
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
  11. 8
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  12. 6
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  13. 69
      clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  14. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
  15. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
  16. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
  17. 31
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
  18. 45
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
  19. 2
      core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
  20. 20
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
  21. 126
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
  22. 21
      streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

4
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -29,9 +29,9 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; @@ -29,9 +29,9 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.ElectionType;
@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient { @@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient {
for (DescribedGroupMember groupMember : members) {
Set<TopicPartition> partitions = Collections.emptySet();
if (groupMember.memberAssignment().length > 0) {
final PartitionAssignor.Assignment assignment = ConsumerProtocol.
final ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
partitions = new HashSet<>(assignment.partitions());
}

2
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig { @@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig {
* <code>partition.assignment.strategy</code>
*/
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name or class type of the assignor implementing the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. A custom assignor that implements ConsumerPartitionAssignor can be plugged in";
/**
* <code>auto.offset.reset</code>

50
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java

@ -0,0 +1,50 @@ @@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import java.util.Optional;
public class ConsumerGroupMetadata {
private String groupId;
private int generationId;
private String memberId;
Optional<String> groupInstanceId;
public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional<String> groupInstanceId) {
this.groupId = groupId;
this.generationId = generationId;
this.memberId = memberId;
this.groupInstanceId = groupInstanceId;
}
public String groupId() {
return groupId;
}
public int generationId() {
return generationId;
}
public String memberId() {
return memberId;
}
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
}

206
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java

@ -0,0 +1,206 @@ @@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
/**
* This interface is used to define custom partition assignment for use in
* {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
* to the topics they are interested in and forward their subscriptions to a Kafka broker serving
* as the group coordinator. The coordinator selects one member to perform the group assignment and
* propagates the subscriptions of all members to it. Then {@link #assign(Cluster, GroupSubscription)} is called
* to perform the assignment and the results are forwarded back to each respective members
*
* In some cases, it is useful to forward additional metadata to the assignor in order to make
* assignment decisions. For this, you can override {@link #subscriptionUserData(Set)} and provide custom
* userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
* can use this user data to forward the rackId belonging to each member.
*/
public interface ConsumerPartitionAssignor {
/**
* Return serialized data that will be included in the {@link Subscription} sent to the leader
* and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
*
* @return optional join subscription user data
*/
default ByteBuffer subscriptionUserData(Set<String> topics) {
return null;
}
/**
* Perform the group assignment given the member subscriptions and current cluster metadata.
* @param metadata Current topic/broker metadata known by consumer
* @param subscriptions Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)}
* @return A map from the members to their respective assignment. This should have one entry
* for each member in the input subscription map.
*/
GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions);
/**
* Callback which is invoked when a group member receives its assignment from the leader.
* @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)}
* @param metadata Additional metadata on the consumer (optional)
*/
default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
}
/**
* Indicate which rebalance protocol this assignor works with;
* By default it should always work with {@link RebalanceProtocol#EAGER}.
*/
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}
/**
* Return the version of the assignor which indicates how the user metadata encodings
* and the assignment algorithm gets evolved.
*/
default short version() {
return (short) 0;
}
/**
* Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required
* to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
* @return non-null unique name
*/
String name();
final class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private Optional<String> groupInstanceId;
public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
this.groupInstanceId = Optional.empty();
}
public Subscription(List<String> topics, ByteBuffer userData) {
this(topics, userData, Collections.emptyList());
}
public Subscription(List<String> topics) {
this(topics, null, Collections.emptyList());
}
public List<String> topics() {
return topics;
}
public ByteBuffer userData() {
return userData;
}
public List<TopicPartition> ownedPartitions() {
return ownedPartitions;
}
public void setGroupInstanceId(Optional<String> groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
}
final class Assignment {
private List<TopicPartition> partitions;
private ByteBuffer userData;
public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
this.partitions = partitions;
this.userData = userData;
}
public Assignment(List<TopicPartition> partitions) {
this(partitions, null);
}
public List<TopicPartition> partitions() {
return partitions;
}
public ByteBuffer userData() {
return userData;
}
}
final class GroupSubscription {
private final Map<String, Subscription> subscriptions;
public GroupSubscription(Map<String, Subscription> subscriptions) {
this.subscriptions = subscriptions;
}
public Map<String, Subscription> groupSubscription() {
return subscriptions;
}
}
final class GroupAssignment {
private final Map<String, Assignment> assignments;
public GroupAssignment(Map<String, Assignment> assignments) {
this.assignments = assignments;
}
public Map<String, Assignment> groupAssignment() {
return assignments;
}
}
enum RebalanceProtocol {
EAGER((byte) 0), COOPERATIVE((byte) 1);
private final byte id;
RebalanceProtocol(byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static RebalanceProtocol forId(byte id) {
switch (id) {
case 0:
return EAGER;
case 1:
return COOPERATIVE;
default:
throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
}
}
}
}

7
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; @@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@ -581,7 +580,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -581,7 +580,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final long requestTimeoutMs;
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private List<PartitionAssignor> assignors;
private List<ConsumerPartitionAssignor> assignors;
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
@ -768,7 +767,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -768,7 +767,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation
this.assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
ConsumerPartitionAssignor.class);
// no coordinator will be constructed for the default (null) group id
this.coordinator = groupId == null ? null :
@ -833,7 +832,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -833,7 +832,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
long retryBackoffMs,
long requestTimeoutMs,
int defaultApiTimeoutMs,
List<PartitionAssignor> assignors,
List<ConsumerPartitionAssignor> assignors,
String groupId) {
this.log = logContext.logger(getClass());
this.clientId = clientId;

11
clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java

@ -365,18 +365,17 @@ public class StickyAssignor extends AbstractPartitionAssignor { @@ -365,18 +365,17 @@ public class StickyAssignor extends AbstractPartitionAssignor {
}
@Override
public void onAssignment(Assignment assignment, int generation) {
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
memberAssignment = assignment.partitions();
this.generation = generation;
this.generation = metadata.generationId();
}
@Override
public Subscription subscription(Set<String> topics) {
public ByteBuffer subscriptionUserData(Set<String> topics) {
if (memberAssignment == null)
return new Subscription(new ArrayList<>(topics));
return null;
return new Subscription(new ArrayList<>(topics),
serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation))));
return serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation)));
}
@Override

18
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
@ -33,7 +34,7 @@ import java.util.Set; @@ -33,7 +34,7 @@ import java.util.Set;
* Abstract assignor implementation which does some common grunt work (in particular collecting
* partition counts which are always needed in assignors).
*/
public abstract class AbstractPartitionAssignor implements PartitionAssignor {
public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssignor {
private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class);
/**
@ -47,12 +48,8 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { @@ -47,12 +48,8 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
Map<String, Subscription> subscriptions);
@Override
public Subscription subscription(Set<String> topics) {
return new Subscription(new ArrayList<>(topics));
}
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscriptions) {
Map<String, Subscription> subscriptions = groupSubscriptions.groupSubscription();
Set<String> allSubscribedTopics = new HashSet<>();
for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())
allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());
@ -72,12 +69,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor { @@ -72,12 +69,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
Map<String, Assignment> assignments = new HashMap<>();
for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
return assignments;
}
@Override
public void onAssignment(Assignment assignment) {
// this assignor maintains no internal state, so nothing to do
return new GroupAssignment(assignments);
}
protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {

86
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

@ -18,13 +18,16 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,13 +18,16 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.RebalanceProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@ -78,7 +81,7 @@ import java.util.stream.Collectors; @@ -78,7 +81,7 @@ import java.util.stream.Collectors;
public final class ConsumerCoordinator extends AbstractCoordinator {
private final GroupRebalanceConfig rebalanceConfig;
private final Logger log;
private final List<PartitionAssignor> assignors;
private final List<ConsumerPartitionAssignor> assignors;
private final ConsumerMetadata metadata;
private final ConsumerCoordinatorMetrics sensors;
private final SubscriptionState subscriptions;
@ -128,7 +131,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -128,7 +131,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
LogContext logContext,
ConsumerNetworkClient client,
List<PartitionAssignor> assignors,
List<ConsumerPartitionAssignor> assignors,
ConsumerMetadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
@ -170,13 +173,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -170,13 +173,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (!assignors.isEmpty()) {
List<RebalanceProtocol> supportedProtocols = new ArrayList<>(assignors.get(0).supportedProtocols());
for (PartitionAssignor assignor : assignors) {
for (ConsumerPartitionAssignor assignor : assignors) {
supportedProtocols.retainAll(assignor.supportedProtocols());
}
if (supportedProtocols.isEmpty()) {
throw new IllegalArgumentException("Specified assignors " +
assignors.stream().map(PartitionAssignor::name).collect(Collectors.toSet()) +
assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet()) +
" do not have commonly supported rebalance protocol");
}
@ -201,8 +204,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -201,8 +204,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
this.joinedSubscription = subscriptions.subscription();
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(joinedSubscription);
for (ConsumerPartitionAssignor assignor : assignors) {
Subscription subscription = new Subscription(new ArrayList<>(joinedSubscription),
assignor.subscriptionUserData(joinedSubscription),
subscriptions.assignedPartitionsList());
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
@ -220,8 +225,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -220,8 +225,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
metadata.requestUpdateForNewTopics();
}
private PartitionAssignor lookupAssignor(String name) {
for (PartitionAssignor assignor : this.assignors) {
private ConsumerPartitionAssignor lookupAssignor(String name) {
for (ConsumerPartitionAssignor assignor : this.assignors) {
if (assignor.name().equals(name))
return assignor;
}
@ -261,7 +266,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -261,7 +266,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (!isLeader)
assignmentSnapshot = null;
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@ -285,7 +290,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -285,7 +290,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
maybeUpdateJoinedSubscription(assignedPartitions);
// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment, generation);
ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
assignor.onAssignment(assignment, metadata);
// reschedule the auto commit starting from now
if (autoCommitEnabled)
@ -314,10 +320,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -314,10 +320,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
case COOPERATIVE:
assignAndRevoke(listener, assignedPartitions, ownedPartitions);
if (assignment.error() == ConsumerProtocol.AssignmentError.NEED_REJOIN) {
requestRejoin();
}
break;
}
@ -470,20 +472,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -470,20 +472,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
// collect all the owned partitions
Map<TopicPartition, String> ownedPartitions = new HashMap<>();
for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
subscriptions.put(memberSubscription.memberId(), subscription);
allSubscribedTopics.addAll(subscription.topics());
ownedPartitions.putAll(subscription.ownedPartitions().stream().collect(Collectors.toMap(item -> item, item -> memberSubscription.memberId())));
}
// the leader will begin watching for changes to any of the topics the group is interested in,
@ -494,16 +493,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -494,16 +493,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions);
Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), subscriptions);
switch (protocol) {
case EAGER:
break;
case COOPERATIVE:
adjustAssignment(ownedPartitions, assignments);
break;
}
Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
// user-customized assignor may have created some topics that are not in the subscription list
// and assign their partitions to the members; in this case we would like to update the leader's
@ -547,40 +537,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -547,40 +537,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return groupAssignment;
}
private void adjustAssignment(final Map<TopicPartition, String> ownedPartitions,
final Map<String, Assignment> assignments) {
boolean revocationsNeeded = false;
Set<TopicPartition> assignedPartitions = new HashSet<>();
for (final Map.Entry<String, Assignment> entry : assignments.entrySet()) {
final Assignment assignment = entry.getValue();
assignedPartitions.addAll(assignment.partitions());
// update the assignment if the partition is owned by another different owner
List<TopicPartition> updatedPartitions = assignment.partitions().stream()
.filter(tp -> ownedPartitions.containsKey(tp) && !entry.getKey().equals(ownedPartitions.get(tp)))
.collect(Collectors.toList());
if (!updatedPartitions.equals(assignment.partitions())) {
assignment.updatePartitions(updatedPartitions);
revocationsNeeded = true;
}
}
// for all owned but not assigned partitions, blindly add them to assignment
for (final Map.Entry<TopicPartition, String> entry : ownedPartitions.entrySet()) {
final TopicPartition tp = entry.getKey();
if (!assignedPartitions.contains(tp)) {
assignments.get(entry.getValue()).partitions().add(tp);
}
}
// if revocations are triggered, tell everyone to re-join immediately.
if (revocationsNeeded) {
for (final Assignment assignment : assignments.values()) {
assignment.setError(ConsumerProtocol.AssignmentError.NEED_REJOIN);
}
}
}
@Override
protected void onJoinPrepare(int generation, String memberId) {
// commit offsets prior to rebalance if auto-commit enabled

95
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java

@ -16,6 +16,9 @@ @@ -16,6 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
@ -49,7 +52,6 @@ import java.util.Map; @@ -49,7 +52,6 @@ import java.util.Map;
* Topic => String
* Partitions => [int32]
* UserData => Bytes
* ErrorCode => [int16]
* </pre>
*
* Version 0 format:
@ -85,11 +87,11 @@ public class ConsumerProtocol { @@ -85,11 +87,11 @@ public class ConsumerProtocol {
public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
public static final String USER_DATA_KEY_NAME = "user_data";
public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Assignment error code");
public static final short CONSUMER_PROTOCOL_V0 = 0;
public static final short CONSUMER_PROTOCOL_V1 = 1;
public static final short CONSUMER_PROTOCL_LATEST_VERSION = CONSUMER_PROTOCOL_V1;
public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema(
new Field(VERSION_KEY_NAME, Type.INT16));
private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
@ -116,36 +118,9 @@ public class ConsumerProtocol { @@ -116,36 +118,9 @@ public class ConsumerProtocol {
public static final Schema ASSIGNMENT_V1 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES),
ERROR_CODE);
public enum AssignmentError {
NONE(0),
NEED_REJOIN(1);
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
private final short code;
AssignmentError(final int code) {
this.code = (short) code;
}
public short code() {
return code;
}
public static AssignmentError fromCode(final short code) {
switch (code) {
case 0:
return NONE;
case 1:
return NEED_REJOIN;
default:
throw new IllegalArgumentException("Unknown error code: " + code);
}
}
}
public static ByteBuffer serializeSubscriptionV0(PartitionAssignor.Subscription subscription) {
public static ByteBuffer serializeSubscriptionV0(Subscription subscription) {
Struct struct = new Struct(SUBSCRIPTION_V0);
struct.set(USER_DATA_KEY_NAME, subscription.userData());
struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
@ -157,7 +132,7 @@ public class ConsumerProtocol { @@ -157,7 +132,7 @@ public class ConsumerProtocol {
return buffer;
}
public static ByteBuffer serializeSubscriptionV1(PartitionAssignor.Subscription subscription) {
public static ByteBuffer serializeSubscriptionV1(Subscription subscription) {
Struct struct = new Struct(SUBSCRIPTION_V1);
struct.set(USER_DATA_KEY_NAME, subscription.userData());
struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
@ -178,8 +153,12 @@ public class ConsumerProtocol { @@ -178,8 +153,12 @@ public class ConsumerProtocol {
return buffer;
}
public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) {
switch (subscription.version()) {
public static ByteBuffer serializeSubscription(Subscription subscription) {
return serializeSubscription(subscription, CONSUMER_PROTOCL_LATEST_VERSION);
}
public static ByteBuffer serializeSubscription(Subscription subscription, short version) {
switch (version) {
case CONSUMER_PROTOCOL_V0:
return serializeSubscriptionV0(subscription);
@ -192,17 +171,17 @@ public class ConsumerProtocol { @@ -192,17 +171,17 @@ public class ConsumerProtocol {
}
}
public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
public static Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
Struct struct = SUBSCRIPTION_V0.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
topics.add((String) topicObj);
return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData);
return new Subscription(topics, userData, Collections.emptyList());
}
public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
public static Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
Struct struct = SUBSCRIPTION_V1.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
@ -218,10 +197,10 @@ public class ConsumerProtocol { @@ -218,10 +197,10 @@ public class ConsumerProtocol {
}
}
return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
return new Subscription(topics, userData, ownedPartitions);
}
public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
public static Subscription deserializeSubscription(ByteBuffer buffer) {
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
Short version = header.getShort(VERSION_KEY_NAME);
@ -241,7 +220,7 @@ public class ConsumerProtocol { @@ -241,7 +220,7 @@ public class ConsumerProtocol {
}
}
public static ByteBuffer serializeAssignmentV0(PartitionAssignor.Assignment assignment) {
public static ByteBuffer serializeAssignmentV0(Assignment assignment) {
Struct struct = new Struct(ASSIGNMENT_V0);
struct.set(USER_DATA_KEY_NAME, assignment.userData());
List<Struct> topicAssignments = new ArrayList<>();
@ -261,7 +240,7 @@ public class ConsumerProtocol { @@ -261,7 +240,7 @@ public class ConsumerProtocol {
return buffer;
}
public static ByteBuffer serializeAssignmentV1(PartitionAssignor.Assignment assignment) {
public static ByteBuffer serializeAssignmentV1(Assignment assignment) {
Struct struct = new Struct(ASSIGNMENT_V1);
struct.set(USER_DATA_KEY_NAME, assignment.userData());
List<Struct> topicAssignments = new ArrayList<>();
@ -273,7 +252,6 @@ public class ConsumerProtocol { @@ -273,7 +252,6 @@ public class ConsumerProtocol {
topicAssignments.add(topicAssignment);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
struct.set(ERROR_CODE, assignment.error().code);
ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + ASSIGNMENT_V1.sizeOf(struct));
CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer);
@ -282,8 +260,12 @@ public class ConsumerProtocol { @@ -282,8 +260,12 @@ public class ConsumerProtocol {
return buffer;
}
public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment assignment) {
switch (assignment.version()) {
public static ByteBuffer serializeAssignment(Assignment assignment) {
return serializeAssignment(assignment, CONSUMER_PROTOCL_LATEST_VERSION);
}
public static ByteBuffer serializeAssignment(Assignment assignment, short version) {
switch (version) {
case CONSUMER_PROTOCOL_V0:
return serializeAssignmentV0(assignment);
@ -296,7 +278,7 @@ public class ConsumerProtocol { @@ -296,7 +278,7 @@ public class ConsumerProtocol {
}
}
public static PartitionAssignor.Assignment deserializeAssignmentV0(ByteBuffer buffer) {
public static Assignment deserializeAssignmentV0(ByteBuffer buffer) {
Struct struct = ASSIGNMENT_V0.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<TopicPartition> partitions = new ArrayList<>();
@ -307,27 +289,14 @@ public class ConsumerProtocol { @@ -307,27 +289,14 @@ public class ConsumerProtocol {
partitions.add(new TopicPartition(topic, (Integer) partitionObj));
}
}
return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V0, partitions, userData);
return new Assignment(partitions, userData);
}
public static PartitionAssignor.Assignment deserializeAssignmentV1(ByteBuffer buffer) {
Struct struct = ASSIGNMENT_V1.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<TopicPartition> partitions = new ArrayList<>();
for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) {
Struct assignment = (Struct) structObj;
String topic = assignment.getString(TOPIC_KEY_NAME);
for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) {
partitions.add(new TopicPartition(topic, (Integer) partitionObj));
}
}
AssignmentError error = AssignmentError.fromCode(struct.get(ERROR_CODE));
return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V1, partitions, userData, error);
public static Assignment deserializeAssignmentV1(ByteBuffer buffer) {
return deserializeAssignmentV0(buffer);
}
public static PartitionAssignor.Assignment deserializeAssignment(ByteBuffer buffer) {
public static Assignment deserializeAssignment(ByteBuffer buffer) {
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
Short version = header.getShort(VERSION_KEY_NAME);

142
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java

@ -18,18 +18,12 @@ package org.apache.kafka.clients.consumer.internals; @@ -18,18 +18,12 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.SchemaException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V1;
/**
* This interface is used to define custom partition assignment for use in
* {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
@ -43,6 +37,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSU @@ -43,6 +37,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSU
* userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
* can use this user data to forward the rackId belonging to each member.
*/
@Deprecated
public interface PartitionAssignor {
/**
@ -79,21 +74,6 @@ public interface PartitionAssignor { @@ -79,21 +74,6 @@ public interface PartitionAssignor {
onAssignment(assignment);
}
/**
* Indicate which rebalance protocol this assignor works with;
* By default it should always work with {@link RebalanceProtocol#EAGER}.
*/
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}
/**
* Return the version of the assignor which indicates how the user metadata encodings
* and the assignment algorithm gets evolved.
*/
default short version() {
return (short) 0;
}
/**
* Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky")
@ -101,156 +81,52 @@ public interface PartitionAssignor { @@ -101,156 +81,52 @@ public interface PartitionAssignor {
*/
String name();
enum RebalanceProtocol {
EAGER((byte) 0), COOPERATIVE((byte) 1);
private final byte id;
RebalanceProtocol(byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static RebalanceProtocol forId(byte id) {
switch (id) {
case 0:
return EAGER;
case 1:
return COOPERATIVE;
default:
throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
}
}
}
class Subscription {
private final Short version;
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private Optional<String> groupInstanceId;
Subscription(Short version,
List<String> topics,
ByteBuffer userData,
List<TopicPartition> ownedPartitions) {
this.version = version;
public Subscription(List<String> topics, ByteBuffer userData) {
this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
this.groupInstanceId = Optional.empty();
if (version < CONSUMER_PROTOCOL_V0)
throw new SchemaException("Unsupported subscription version: " + version);
if (version < CONSUMER_PROTOCOL_V1 && !ownedPartitions.isEmpty())
throw new IllegalArgumentException("Subscription version smaller than 1 should not have owned partitions");
}
Subscription(Short version, List<String> topics, ByteBuffer userData) {
this(version, topics, userData, Collections.emptyList());
}
public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
}
public Subscription(List<String> topics, ByteBuffer userData) {
this(CONSUMER_PROTOCOL_V1, topics, userData);
}
public Subscription(List<String> topics) {
this(topics, ByteBuffer.wrap(new byte[0]));
}
Short version() {
return version;
}
public List<String> topics() {
return topics;
}
public List<TopicPartition> ownedPartitions() {
return ownedPartitions;
}
public ByteBuffer userData() {
return userData;
}
public void setGroupInstanceId(Optional<String> groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
@Override
public String toString() {
return "Subscription(" +
"version=" + version +
", topics=" + topics +
", ownedPartitions=" + ownedPartitions +
", group.instance.id=" + groupInstanceId + ")";
"topics=" + topics +
')';
}
}
class Assignment {
private final Short version;
private List<TopicPartition> partitions;
private final List<TopicPartition> partitions;
private final ByteBuffer userData;
private ConsumerProtocol.AssignmentError error;
Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.AssignmentError error) {
this.version = version;
public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
this.partitions = partitions;
this.userData = userData;
this.error = error;
if (version < CONSUMER_PROTOCOL_V0)
throw new SchemaException("Unsupported subscription version: " + version);
if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.AssignmentError.NONE)
throw new IllegalArgumentException("Assignment version smaller than 1 should not have error code.");
}
Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData) {
this(version, partitions, userData, ConsumerProtocol.AssignmentError.NONE);
}
public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
this(CONSUMER_PROTOCOL_V1, partitions, userData);
}
public Assignment(List<TopicPartition> partitions) {
this(partitions, ByteBuffer.wrap(new byte[0]));
}
Short version() {
return version;
}
public List<TopicPartition> partitions() {
return partitions;
}
public ConsumerProtocol.AssignmentError error() {
return error;
}
public void updatePartitions(List<TopicPartition> partitions) {
this.partitions = partitions;
}
public void setError(ConsumerProtocol.AssignmentError error) {
this.error = error;
}
public ByteBuffer userData() {
return userData;
}
@ -258,10 +134,8 @@ public interface PartitionAssignor { @@ -258,10 +134,8 @@ public interface PartitionAssignor {
@Override
public String toString() {
return "Assignment(" +
"version=" + version +
", partitions=" + partitions +
", error=" + error +
')';
"partitions=" + partitions +
')';
}
}

8
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.ArrayList;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
@ -387,6 +388,13 @@ public class SubscriptionState { @@ -387,6 +388,13 @@ public class SubscriptionState {
return new HashSet<>(this.assignment.partitionSet());
}
/**
* @return a modifiable copy of the currently assigned partitions as a list
*/
public synchronized List<TopicPartition> assignedPartitionsList() {
return new ArrayList<>(this.assignment.partitionSet());
}
/**
* Provides the number of assigned partitions in a thread safe manner.
* @return the number of assigned partitions.

6
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -21,9 +21,9 @@ import org.apache.kafka.clients.ClientUtils; @@ -21,9 +21,9 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
@ -1222,7 +1222,7 @@ public class KafkaAdminClientTest { @@ -1222,7 +1222,7 @@ public class KafkaAdminClientTest {
topicPartitions.add(1, myTopicPartition1);
topicPartitions.add(2, myTopicPartition2);
final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions));
byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
memberAssignment.get(memberAssignmentBytes);
@ -1282,7 +1282,7 @@ public class KafkaAdminClientTest { @@ -1282,7 +1282,7 @@ public class KafkaAdminClientTest {
topicPartitions.add(1, myTopicPartition1);
topicPartitions.add(2, myTopicPartition2);
final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions));
byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
memberAssignment.get(memberAssignmentBytes);

69
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics; @@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@ -396,7 +395,7 @@ public class KafkaConsumerTest { @@ -396,7 +395,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
@ -430,7 +429,7 @@ public class KafkaConsumerTest { @@ -430,7 +429,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@ -465,7 +464,7 @@ public class KafkaConsumerTest { @@ -465,7 +464,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
final PartitionAssignor assignor = new RoundRobinAssignor();
final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@ -489,7 +488,7 @@ public class KafkaConsumerTest { @@ -489,7 +488,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
final PartitionAssignor assignor = new RoundRobinAssignor();
final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@ -512,7 +511,7 @@ public class KafkaConsumerTest { @@ -512,7 +511,7 @@ public class KafkaConsumerTest {
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singleton(tp0));
@ -587,7 +586,7 @@ public class KafkaConsumerTest { @@ -587,7 +586,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId);
@ -611,7 +610,7 @@ public class KafkaConsumerTest { @@ -611,7 +610,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId);
@ -636,7 +635,7 @@ public class KafkaConsumerTest { @@ -636,7 +635,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId);
@ -663,7 +662,7 @@ public class KafkaConsumerTest { @@ -663,7 +662,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
true, groupId, Optional.empty());
@ -686,7 +685,7 @@ public class KafkaConsumerTest { @@ -686,7 +685,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
@ -724,7 +723,7 @@ public class KafkaConsumerTest { @@ -724,7 +723,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@ -764,7 +763,7 @@ public class KafkaConsumerTest { @@ -764,7 +763,7 @@ public class KafkaConsumerTest {
initMetadata(client, partitionCounts);
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
@ -782,7 +781,7 @@ public class KafkaConsumerTest { @@ -782,7 +781,7 @@ public class KafkaConsumerTest {
@Test
public void testChangingRegexSubscription() {
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
String otherTopic = "other";
TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
@ -828,7 +827,7 @@ public class KafkaConsumerTest { @@ -828,7 +827,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@ -878,7 +877,7 @@ public class KafkaConsumerTest { @@ -878,7 +877,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
final PartitionAssignor assignor = new RoundRobinAssignor();
final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@ -908,7 +907,7 @@ public class KafkaConsumerTest { @@ -908,7 +907,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RangeAssignor();
ConsumerPartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
@ -948,7 +947,7 @@ public class KafkaConsumerTest { @@ -948,7 +947,7 @@ public class KafkaConsumerTest {
initMetadata(client, tpCounts);
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RangeAssignor();
ConsumerPartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
@ -1062,7 +1061,7 @@ public class KafkaConsumerTest { @@ -1062,7 +1061,7 @@ public class KafkaConsumerTest {
initMetadata(client, tpCounts);
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RangeAssignor();
ConsumerPartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
@ -1124,7 +1123,7 @@ public class KafkaConsumerTest { @@ -1124,7 +1123,7 @@ public class KafkaConsumerTest {
initMetadata(client, tpCounts);
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RangeAssignor();
ConsumerPartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
@ -1180,7 +1179,7 @@ public class KafkaConsumerTest { @@ -1180,7 +1179,7 @@ public class KafkaConsumerTest {
initMetadata(client, tpCounts);
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RangeAssignor();
ConsumerPartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
@ -1234,7 +1233,7 @@ public class KafkaConsumerTest { @@ -1234,7 +1233,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RangeAssignor();
ConsumerPartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
@ -1429,7 +1428,7 @@ public class KafkaConsumerTest { @@ -1429,7 +1428,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@ -1457,7 +1456,7 @@ public class KafkaConsumerTest { @@ -1457,7 +1456,7 @@ public class KafkaConsumerTest {
coordinator);
// join group
final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic)));
final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(singletonList(topic)));
// This member becomes the leader
String memberId = "memberId";
@ -1512,7 +1511,7 @@ public class KafkaConsumerTest { @@ -1512,7 +1511,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty());
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@ -1649,7 +1648,7 @@ public class KafkaConsumerTest { @@ -1649,7 +1648,7 @@ public class KafkaConsumerTest {
initMetadata(client, singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RangeAssignor();
ConsumerPartitionAssignor assignor = new RangeAssignor();
client.createPendingAuthenticationError(node, 0);
return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
@ -1675,7 +1674,7 @@ public class KafkaConsumerTest { @@ -1675,7 +1674,7 @@ public class KafkaConsumerTest {
subscription, new LogContext(), new ClusterResourceListeners());
}
private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, ConsumerPartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
@ -1692,7 +1691,7 @@ public class KafkaConsumerTest { @@ -1692,7 +1691,7 @@ public class KafkaConsumerTest {
assertTrue(protocolIterator.hasNext());
ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
return subscribedTopics.equals(new HashSet<>(subscription.topics()));
}
}, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
@ -1703,7 +1702,7 @@ public class KafkaConsumerTest { @@ -1703,7 +1702,7 @@ public class KafkaConsumerTest {
return coordinator;
}
private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
private Node prepareRebalance(MockClient client, Node node, ConsumerPartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
@ -1764,7 +1763,7 @@ public class KafkaConsumerTest { @@ -1764,7 +1763,7 @@ public class KafkaConsumerTest {
return new OffsetCommitResponse(responseData);
}
private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) {
private JoinGroupResponse joinGroupFollowerResponse(ConsumerPartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) {
return new JoinGroupResponse(
new JoinGroupResponseData()
.setErrorCode(error.code())
@ -1777,7 +1776,7 @@ public class KafkaConsumerTest { @@ -1777,7 +1776,7 @@ public class KafkaConsumerTest {
}
private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions));
return new SyncGroupResponse(
new SyncGroupResponseData()
.setErrorCode(error.code())
@ -1848,7 +1847,7 @@ public class KafkaConsumerTest { @@ -1848,7 +1847,7 @@ public class KafkaConsumerTest {
KafkaClient client,
SubscriptionState subscription,
ConsumerMetadata metadata,
PartitionAssignor assignor,
ConsumerPartitionAssignor assignor,
boolean autoCommitEnabled,
Optional<String> groupInstanceId) {
return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId);
@ -1865,7 +1864,7 @@ public class KafkaConsumerTest { @@ -1865,7 +1864,7 @@ public class KafkaConsumerTest {
KafkaClient client,
SubscriptionState subscription,
ConsumerMetadata metadata,
PartitionAssignor assignor,
ConsumerPartitionAssignor assignor,
boolean autoCommitEnabled,
String groupId,
Optional<String> groupInstanceId) {
@ -1885,7 +1884,7 @@ public class KafkaConsumerTest { @@ -1885,7 +1884,7 @@ public class KafkaConsumerTest {
Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<String> valueDeserializer = new StringDeserializer();
List<PartitionAssignor> assignors = singletonList(assignor);
List<ConsumerPartitionAssignor> assignors = singletonList(assignor);
ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.emptyList());
Metrics metrics = new Metrics();
@ -1985,7 +1984,7 @@ public class KafkaConsumerTest { @@ -1985,7 +1984,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Cluster cluster = metadata.fetch();
PartitionAssignor assignor = new RoundRobinAssignor();
ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
String invalidTopicName = "topic abc"; // Invalid topic name due to space

2
clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;

2
clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;

2
clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java

@ -30,7 +30,7 @@ import java.util.Random; @@ -30,7 +30,7 @@ import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.consumer.StickyAssignor.ConsumerUserData;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;

31
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientResponse; @@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@ -125,9 +126,9 @@ public class ConsumerCoordinatorTest { @@ -125,9 +126,9 @@ public class ConsumerCoordinatorTest {
private final MockTime time = new MockTime();
private GroupRebalanceConfig rebalanceConfig;
private final PartitionAssignor.RebalanceProtocol protocol;
private final ConsumerPartitionAssignor.RebalanceProtocol protocol;
private final MockPartitionAssignor partitionAssignor;
private final List<PartitionAssignor> assignors;
private final List<ConsumerPartitionAssignor> assignors;
private MockClient client;
private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
{
@ -144,7 +145,7 @@ public class ConsumerCoordinatorTest { @@ -144,7 +145,7 @@ public class ConsumerCoordinatorTest {
private MockCommitCallback mockOffsetCommitCallback;
private ConsumerCoordinator coordinator;
public ConsumerCoordinatorTest(final PartitionAssignor.RebalanceProtocol protocol) {
public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) {
this.protocol = protocol;
this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol));
this.assignors = Collections.singletonList(partitionAssignor);
@ -153,7 +154,7 @@ public class ConsumerCoordinatorTest { @@ -153,7 +154,7 @@ public class ConsumerCoordinatorTest {
@Parameterized.Parameters(name = "rebalance protocol = {0}")
public static Collection<Object[]> data() {
final List<Object[]> values = new ArrayList<>();
for (final PartitionAssignor.RebalanceProtocol protocol: PartitionAssignor.RebalanceProtocol.values()) {
for (final ConsumerPartitionAssignor.RebalanceProtocol protocol: ConsumerPartitionAssignor.RebalanceProtocol.values()) {
values.add(new Object[]{protocol});
}
return values;
@ -198,20 +199,20 @@ public class ConsumerCoordinatorTest { @@ -198,20 +199,20 @@ public class ConsumerCoordinatorTest {
@Test
public void testSelectRebalanceProtcol() {
List<PartitionAssignor> assignors = new ArrayList<>();
assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.EAGER)));
assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER)));
assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
// no commonly supported protocols
assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
assignors.clear();
assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
// select higher indexed (more advanced) protocols
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
assertEquals(PartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol());
assertEquals(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol());
}
}
@ -553,7 +554,7 @@ public class ConsumerCoordinatorTest { @@ -553,7 +554,7 @@ public class ConsumerCoordinatorTest {
final int addCount = 1;
// with eager protocol we will call revoke on the old assignment as well
if (protocol == PartitionAssignor.RebalanceProtocol.EAGER) {
if (protocol == ConsumerPartitionAssignor.RebalanceProtocol.EAGER) {
revokeCount += 1;
}
@ -670,7 +671,7 @@ public class ConsumerCoordinatorTest { @@ -670,7 +671,7 @@ public class ConsumerCoordinatorTest {
JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
metadata.rewind();
return subscription.topics().containsAll(updatedSubscription);
}
@ -2326,7 +2327,7 @@ public class ConsumerCoordinatorTest { @@ -2326,7 +2327,7 @@ public class ConsumerCoordinatorTest {
private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig,
final Metrics metrics,
final List<PartitionAssignor> assignors,
final List<ConsumerPartitionAssignor> assignors,
final boolean autoCommitEnabled) {
return new ConsumerCoordinator(
rebalanceConfig,
@ -2385,7 +2386,7 @@ public class ConsumerCoordinatorTest { @@ -2385,7 +2386,7 @@ public class ConsumerCoordinatorTest {
Errors error) {
List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId(subscriptionEntry.getKey())
@ -2416,7 +2417,7 @@ public class ConsumerCoordinatorTest { @@ -2416,7 +2417,7 @@ public class ConsumerCoordinatorTest {
}
private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions));
return new SyncGroupResponse(
new SyncGroupResponseData()
.setErrorCode(error.code())

45
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java

@ -16,8 +16,8 @@ @@ -16,8 +16,8 @@
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
@ -39,7 +39,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC @@ -39,7 +39,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_DATA_KEY_NAME;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.test.TestUtils.toSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -54,7 +53,7 @@ public class ConsumerProtocolTest { @@ -54,7 +53,7 @@ public class ConsumerProtocolTest {
@Test
public void serializeDeserializeMetadata() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0]));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(subscription.topics(), parsedSubscription.topics());
@ -64,7 +63,7 @@ public class ConsumerProtocolTest { @@ -64,7 +63,7 @@ public class ConsumerProtocolTest {
@Test
public void serializeDeserializeMetadataAndGroupInstanceId() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0]));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
@ -85,8 +84,8 @@ public class ConsumerProtocolTest { @@ -85,8 +84,8 @@ public class ConsumerProtocolTest {
@Test
public void deserializeOldSubscriptionVersion() {
Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscriptionV0(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
@ -95,7 +94,7 @@ public class ConsumerProtocolTest { @@ -95,7 +94,7 @@ public class ConsumerProtocolTest {
@Test
public void deserializeNewSubscriptionWithOldVersion() {
Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
// ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
@ -145,7 +144,7 @@ public class ConsumerProtocolTest { @@ -145,7 +144,7 @@ public class ConsumerProtocolTest {
@Test
public void serializeDeserializeAssignment() {
List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment(partitions));
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment(partitions, ByteBuffer.wrap(new byte[0])));
Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
assertEquals(0, parsedAssignment.userData().limit());
@ -160,29 +159,6 @@ public class ConsumerProtocolTest { @@ -160,29 +159,6 @@ public class ConsumerProtocolTest {
assertNull(parsedAssignment.userData());
}
@Test
public void deserializeOldAssignmentVersion() {
List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 0, partitions, null));
Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
assertNull(parsedAssignment.userData());
assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error());
}
@Test
public void deserializeNewAssignmentWithOldVersion() {
List<TopicPartition> partitions = Collections.singletonList(tp1);
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 1, partitions, null, ConsumerProtocol.AssignmentError.NEED_REJOIN));
// ignore the version assuming it is the old byte code, as it will blindly deserialize as 0
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
header.getShort(VERSION_KEY_NAME);
Assignment parsedAssignment = ConsumerProtocol.deserializeAssignmentV0(buffer);
assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
assertNull(parsedAssignment.userData());
assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error());
}
@Test
public void deserializeFutureAssignmentVersion() {
// verify that a new version which adds a field is still parseable
@ -191,7 +167,6 @@ public class ConsumerProtocolTest { @@ -191,7 +167,6 @@ public class ConsumerProtocolTest {
Schema assignmentSchemaV100 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
new Field(USER_DATA_KEY_NAME, Type.BYTES),
ERROR_CODE,
new Field("foo", Type.STRING));
Struct assignmentV100 = new Struct(assignmentSchemaV100);
@ -200,7 +175,6 @@ public class ConsumerProtocolTest { @@ -200,7 +175,6 @@ public class ConsumerProtocolTest {
.set(ConsumerProtocol.TOPIC_KEY_NAME, tp1.topic())
.set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{tp1.partition()})});
assignmentV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
assignmentV100.set(ERROR_CODE.name, ConsumerProtocol.AssignmentError.NEED_REJOIN.code());
assignmentV100.set("foo", "bar");
Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA);
@ -212,8 +186,7 @@ public class ConsumerProtocolTest { @@ -212,8 +186,7 @@ public class ConsumerProtocolTest {
buffer.flip();
PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
assertEquals(toSet(Collections.singletonList(tp1)), toSet(assignment.partitions()));
assertEquals(ConsumerProtocol.AssignmentError.NEED_REJOIN, assignment.error());
}
}

2
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala

@ -31,8 +31,8 @@ import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManag @@ -31,8 +31,8 @@ import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManag
import kafka.server.HostedPartition
import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors

20
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

@ -16,8 +16,10 @@ @@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.nio.ByteBuffer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
public class StreamsPartitionAssignor implements PartitionAssignor, Configurable {
public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
final static int UNKNOWN = -1;
private final static int VERSION_ONE = 1;
@ -309,7 +311,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable @@ -309,7 +311,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
@Override
public Subscription subscription(final Set<String> topics) {
public ByteBuffer subscriptionUserData(final Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
@ -327,7 +329,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable @@ -327,7 +329,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
taskManager.updateSubscriptionsFromMetadata(topics);
return new Subscription(new ArrayList<>(topics), data.encode());
return data.encode();
}
private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
@ -371,8 +373,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable @@ -371,8 +373,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
* 3. within each client, tasks are assigned to consumer clients in round-robin manner.
*/
@Override
public Map<String, Assignment> assign(final Cluster metadata,
final Map<String, Subscription> subscriptions) {
public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscriptions) {
final Map<String, Subscription> subscriptions = groupSubscriptions.groupSubscription();
// construct the client metadata from the decoded subscription info
final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
final Set<String> futureConsumers = new HashSet<>();
@ -446,7 +448,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable @@ -446,7 +448,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
!metadata.topics().contains(topic)) {
log.error("Missing source topic {} durign assignment. Returning error {}.",
topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
return errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
return new GroupAssignment(errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code));
}
}
for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
@ -644,7 +646,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable @@ -644,7 +646,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
assignment = computeNewAssignment(clientMetadataMap, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion);
}
return assignment;
return new GroupAssignment(assignment);
}
private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
@ -777,7 +779,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable @@ -777,7 +779,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
@Override
public void onAssignment(final Assignment assignment) {
public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
partitions.sort(PARTITION_COMPARATOR);

126
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java

@ -16,7 +16,8 @@ @@ -16,7 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@ -146,7 +147,7 @@ public class StreamsPartitionAssignorTest { @@ -146,7 +147,7 @@ public class StreamsPartitionAssignorTest {
EasyMock.replay(taskManager);
}
private Map<String, PartitionAssignor.Subscription> subscriptions;
private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions;
@Before
public void setUp() {
@ -200,7 +201,9 @@ public class StreamsPartitionAssignorTest { @@ -200,7 +201,9 @@ public class StreamsPartitionAssignorTest {
mockTaskManager(prevTasks, cachedTasks, processId, builder);
configurePartitionAssignor(Collections.emptyMap());
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
final Set<String> topics = Utils.mkSet("topic1", "topic2");
final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
Collections.sort(subscription.topics());
assertEquals(asList("topic1", "topic2"), subscription.topics());
@ -236,16 +239,16 @@ public class StreamsPartitionAssignorTest { @@ -236,16 +239,16 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// check assigned partitions
assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
@ -320,13 +323,13 @@ public class StreamsPartitionAssignorTest { @@ -320,13 +323,13 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, new GroupSubscription(subscriptions)).groupAssignment();
// check assigned partitions
assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), Utils.mkSet(t1p1, t2p1, t1p3, t2p3)),
@ -365,10 +368,10 @@ public class StreamsPartitionAssignorTest { @@ -365,10 +368,10 @@ public class StreamsPartitionAssignorTest {
// will throw exception if it fails
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()
));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// check assignment info
final AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
@ -399,12 +402,12 @@ public class StreamsPartitionAssignorTest { @@ -399,12 +402,12 @@ public class StreamsPartitionAssignorTest {
configurePartitionAssignor(Collections.emptyMap());
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()
));
// initially metadata is empty
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, subscriptions);
Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, new GroupSubscription(subscriptions)).groupAssignment();
// check assigned partitions
assertEquals(Collections.emptySet(),
@ -417,7 +420,7 @@ public class StreamsPartitionAssignorTest { @@ -417,7 +420,7 @@ public class StreamsPartitionAssignorTest {
assertEquals(0, allActiveTasks.size());
// then metadata gets populated
assignments = partitionAssignor.assign(metadata, subscriptions);
assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// check assigned partitions
assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)),
Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions())));
@ -455,16 +458,16 @@ public class StreamsPartitionAssignorTest { @@ -455,16 +458,16 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, prevTasks10, emptyTasks, userEndPoint).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, prevTasks11, emptyTasks, userEndPoint).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid2, prevTasks20, emptyTasks, userEndPoint).encode()));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
// also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
@ -521,16 +524,16 @@ public class StreamsPartitionAssignorTest { @@ -521,16 +524,16 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid2, emptyTasks, emptyTasks, userEndPoint).encode()));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
assertEquals(2, assignments.get("consumer10").partitions().size());
@ -609,16 +612,16 @@ public class StreamsPartitionAssignorTest { @@ -609,16 +612,16 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode()));
subscriptions.put("consumer11",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode()));
subscriptions.put("consumer20",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode()));
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// the first consumer
final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
@ -666,7 +669,7 @@ public class StreamsPartitionAssignorTest { @@ -666,7 +669,7 @@ public class StreamsPartitionAssignorTest {
standbyTasks.put(task2, Utils.mkSet(t3p2));
final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState);
final PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode());
final ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode());
final Capture<Cluster> capturedCluster = EasyMock.newCapture();
taskManager.setPartitionsByHostState(hostState);
@ -677,7 +680,7 @@ public class StreamsPartitionAssignorTest { @@ -677,7 +680,7 @@ public class StreamsPartitionAssignorTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager);
partitionAssignor.onAssignment(assignment);
partitionAssignor.onAssignment(assignment, null);
EasyMock.verify(taskManager);
@ -704,10 +707,10 @@ public class StreamsPartitionAssignorTest { @@ -704,10 +707,10 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(internalTopicManager);
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
);
partitionAssignor.assign(metadata, subscriptions);
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// check prepared internal topics
assertEquals(1, internalTopicManager.readyTopics.size());
@ -738,10 +741,10 @@ public class StreamsPartitionAssignorTest { @@ -738,10 +741,10 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(internalTopicManager);
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
);
partitionAssignor.assign(metadata, subscriptions);
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
// check prepared internal topics
assertEquals(2, internalTopicManager.readyTopics.size());
@ -790,11 +793,11 @@ public class StreamsPartitionAssignorTest { @@ -790,11 +793,11 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
subscriptions.put(client,
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
asList("topic1", "topic3"),
new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
);
final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
@ -841,7 +844,8 @@ public class StreamsPartitionAssignorTest { @@ -841,7 +844,8 @@ public class StreamsPartitionAssignorTest {
uuid1,
builder);
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
final Set<String> topics = Utils.mkSet("input");
final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
}
@ -863,11 +867,11 @@ public class StreamsPartitionAssignorTest { @@ -863,11 +867,11 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
subscriptions.put("consumer1",
new PartitionAssignor.Subscription(topics,
new ConsumerPartitionAssignor.Subscription(topics,
new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
);
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
final ConsumerPartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
assertEquals(
@ -961,11 +965,11 @@ public class StreamsPartitionAssignorTest { @@ -961,11 +965,11 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
subscriptions.put(client,
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("unknownTopic"),
new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
);
final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true));
@ -985,7 +989,7 @@ public class StreamsPartitionAssignorTest { @@ -985,7 +989,7 @@ public class StreamsPartitionAssignorTest {
EasyMock.expectLastCall();
EasyMock.replay(taskManager);
partitionAssignor.onAssignment(createAssignment(hostState));
partitionAssignor.onAssignment(createAssignment(hostState), null);
EasyMock.verify(taskManager);
}
@ -1015,18 +1019,18 @@ public class StreamsPartitionAssignorTest { @@ -1015,18 +1019,18 @@ public class StreamsPartitionAssignorTest {
mockClientSupplier.restoreConsumer));
subscriptions.put("consumer1",
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
);
subscriptions.put("consumer2",
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode())
);
final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2);
final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
final Map<String, ConsumerPartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
final ConsumerPartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
@ -1109,12 +1113,12 @@ public class StreamsPartitionAssignorTest { @@ -1109,12 +1113,12 @@ public class StreamsPartitionAssignorTest {
private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion,
final int otherVersion) {
subscriptions.put("consumer1",
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode())
);
subscriptions.put("consumer2",
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
)
@ -1126,7 +1130,7 @@ public class StreamsPartitionAssignorTest { @@ -1126,7 +1130,7 @@ public class StreamsPartitionAssignorTest {
UUID.randomUUID(),
builder);
partitionAssignor.configure(configProps());
final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
assertThat(assignment.size(), equalTo(2));
assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion));
@ -1142,7 +1146,8 @@ public class StreamsPartitionAssignorTest { @@ -1142,7 +1146,8 @@ public class StreamsPartitionAssignorTest {
builder);
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100));
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
final Set<String> topics = Utils.mkSet("topic1");
final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1));
}
@ -1180,7 +1185,8 @@ public class StreamsPartitionAssignorTest { @@ -1180,7 +1185,8 @@ public class StreamsPartitionAssignorTest {
builder);
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue));
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
final Set<String> topics = Utils.mkSet("topic1");
final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2));
}
@ -1200,12 +1206,12 @@ public class StreamsPartitionAssignorTest { @@ -1200,12 +1206,12 @@ public class StreamsPartitionAssignorTest {
};
subscriptions.put("consumer1",
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode())
);
subscriptions.put("future-consumer",
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
encodeFutureSubscription())
);
@ -1216,7 +1222,7 @@ public class StreamsPartitionAssignorTest { @@ -1216,7 +1222,7 @@ public class StreamsPartitionAssignorTest {
UUID.randomUUID(),
builder);
partitionAssignor.configure(configProps());
final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
assertThat(assignment.size(), equalTo(2));
assertThat(
@ -1252,12 +1258,12 @@ public class StreamsPartitionAssignorTest { @@ -1252,12 +1258,12 @@ public class StreamsPartitionAssignorTest {
private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
subscriptions.put("consumer1",
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode())
);
subscriptions.put("future-consumer",
new PartitionAssignor.Subscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList("topic1"),
encodeFutureSubscription())
);
@ -1270,24 +1276,24 @@ public class StreamsPartitionAssignorTest { @@ -1270,24 +1276,24 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.configure(configProps());
try {
partitionAssignor.assign(metadata, subscriptions);
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
fail("Should have thrown IllegalStateException");
} catch (final IllegalStateException expected) {
// pass
}
}
private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
private ConsumerPartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(),
Collections.emptyMap(),
firstHostState);
return new PartitionAssignor.Assignment(
return new ConsumerPartitionAssignor.Assignment(
Collections.emptyList(), info.encode());
}
private AssignmentInfo checkAssignment(final Set<String> expectedTopics,
final PartitionAssignor.Assignment assignment) {
final ConsumerPartitionAssignor.Assignment assignment) {
// This assumed 1) DefaultPartitionGrouper is used, and 2) there is an only one topic group.

21
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -18,8 +18,9 @@ package org.apache.kafka.streams.tests; @@ -18,8 +18,9 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@ -113,7 +114,7 @@ public class StreamsUpgradeTest { @@ -113,7 +114,7 @@ public class StreamsUpgradeTest {
}
@Override
public Subscription subscription(final Set<String> topics) {
public ByteBuffer subscriptionUserData(final Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
@ -133,13 +134,13 @@ public class StreamsUpgradeTest { @@ -133,13 +134,13 @@ public class StreamsUpgradeTest {
taskManager.updateSubscriptionsFromMetadata(topics);
return new Subscription(new ArrayList<>(topics), data.encode());
return data.encode();
}
@Override
public void onAssignment(final PartitionAssignor.Assignment assignment) {
public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, final ConsumerGroupMetadata metadata) {
try {
super.onAssignment(assignment);
super.onAssignment(assignment, metadata);
return;
} catch (final TaskAssignmentException cannotProcessFutureVersion) {
// continue
@ -183,15 +184,15 @@ public class StreamsUpgradeTest { @@ -183,15 +184,15 @@ public class StreamsUpgradeTest {
}
@Override
public Map<String, Assignment> assign(final Cluster metadata,
final Map<String, Subscription> subscriptions) {
public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription) {
final Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();
Map<String, Assignment> assignment = null;
final Map<String, Subscription> downgradedSubscriptions = new HashMap<>();
for (final Subscription subscription : subscriptions.values()) {
final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
if (info.version() < SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1) {
assignment = super.assign(metadata, subscriptions);
assignment = super.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
break;
}
}
@ -219,7 +220,7 @@ public class StreamsUpgradeTest { @@ -219,7 +220,7 @@ public class StreamsUpgradeTest {
info.userEndPoint())
.encode()));
}
assignment = super.assign(metadata, downgradedSubscriptions);
assignment = super.assign(metadata, new GroupSubscription(downgradedSubscriptions)).groupAssignment();
bumpUsedVersion = true;
bumpSupportedVersion = true;
}
@ -238,7 +239,7 @@ public class StreamsUpgradeTest { @@ -238,7 +239,7 @@ public class StreamsUpgradeTest {
.encode()));
}
return newAssignment;
return new GroupAssignment(newAssignment);
}
}

Loading…
Cancel
Save