Browse Source

KAFKA-8539; Add group.instance.id to Subscription (#6936)

This PR is part of KIP-345's effort to utilize this new field for more stable topic partition assignment. We add the group instance id to the `Subscription` object to allow partition assignors to make stickier assignments. More details [here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances#KIP-345:Introducestaticmembershipprotocoltoreduceconsumerrebalances-ClientBehaviorChanges). 

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6951/head
Boyang Chen 5 years ago committed by Jason Gustafson
parent
commit
47f908fa73
  1. 7
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
  2. 30
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
  3. 24
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
  4. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  5. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
  6. 37
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java

7
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

@ -388,9 +388,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -388,9 +388,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
for (JoinGroupResponseData.JoinGroupResponseMember memberSubScription : allSubscriptions) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubScription.metadata()));
subscriptions.put(memberSubScription.memberId(), subscription);
for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
Optional.ofNullable(memberSubscription.groupInstanceId()));
subscriptions.put(memberSubscription.memberId(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}

30
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java

@ -27,8 +27,10 @@ import org.apache.kafka.common.utils.CollectionUtils; @@ -27,8 +27,10 @@ import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@ -177,17 +179,22 @@ public class ConsumerProtocol { @@ -177,17 +179,22 @@ public class ConsumerProtocol {
}
}
public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer,
Optional<String> groupInstanceId) {
Struct struct = SUBSCRIPTION_V0.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
topics.add((String) topicObj);
return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData);
return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0,
topics,
userData,
Collections.emptyList(),
groupInstanceId);
}
public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer,
Optional<String> groupInstanceId) {
Struct struct = SUBSCRIPTION_V1.read(buffer);
ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
List<String> topics = new ArrayList<>();
@ -203,10 +210,15 @@ public class ConsumerProtocol { @@ -203,10 +210,15 @@ public class ConsumerProtocol {
}
}
return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1,
topics,
userData,
ownedPartitions,
groupInstanceId);
}
public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer,
Optional<String> groupInstanceId) {
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
Short version = header.getShort(VERSION_KEY_NAME);
@ -215,14 +227,14 @@ public class ConsumerProtocol { @@ -215,14 +227,14 @@ public class ConsumerProtocol {
switch (version) {
case CONSUMER_PROTOCOL_V0:
return deserializeSubscriptionV0(buffer);
return deserializeSubscriptionV0(buffer, groupInstanceId);
case CONSUMER_PROTOCOL_V1:
return deserializeSubscriptionV1(buffer);
return deserializeSubscriptionV1(buffer, groupInstanceId);
// assume all higher versions can be parsed as V1
default:
return deserializeSubscriptionV1(buffer);
return deserializeSubscriptionV1(buffer, groupInstanceId);
}
}

24
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java

@ -24,6 +24,7 @@ import java.nio.ByteBuffer; @@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0;
@ -130,12 +131,18 @@ public interface PartitionAssignor { @@ -130,12 +131,18 @@ public interface PartitionAssignor {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private final Optional<String> groupInstanceId;
Subscription(Short version, List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
Subscription(Short version,
List<String> topics,
ByteBuffer userData,
List<TopicPartition> ownedPartitions,
Optional<String> groupInstanceId) {
this.version = version;
this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
this.groupInstanceId = groupInstanceId;
if (version < CONSUMER_PROTOCOL_V0)
throw new SchemaException("Unsupported subscription version: " + version);
@ -145,11 +152,14 @@ public interface PartitionAssignor { @@ -145,11 +152,14 @@ public interface PartitionAssignor {
}
Subscription(Short version, List<String> topics, ByteBuffer userData) {
this(version, topics, userData, Collections.emptyList());
this(version, topics, userData, Collections.emptyList(), Optional.empty());
}
public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
public Subscription(List<String> topics,
ByteBuffer userData,
List<TopicPartition> ownedPartitions,
Optional<String> groupInstanceId) {
this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions, groupInstanceId);
}
public Subscription(List<String> topics, ByteBuffer userData) {
@ -176,13 +186,17 @@ public interface PartitionAssignor { @@ -176,13 +186,17 @@ public interface PartitionAssignor {
return userData;
}
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
@Override
public String toString() {
return "Subscription(" +
"version=" + version +
", topics=" + topics +
", ownedPartitions=" + ownedPartitions +
')';
", group.instance.id=" + groupInstanceId + ")";
}
}

2
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

@ -1692,7 +1692,7 @@ public class KafkaConsumerTest { @@ -1692,7 +1692,7 @@ public class KafkaConsumerTest {
assertTrue(protocolIterator.hasNext());
ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata, Optional.empty());
return subscribedTopics.equals(new HashSet<>(subscription.topics()));
}
}, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);

2
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

@ -600,7 +600,7 @@ public class ConsumerCoordinatorTest { @@ -600,7 +600,7 @@ public class ConsumerCoordinatorTest {
JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata, Optional.empty());
metadata.rewind();
return subscription.topics().containsAll(updatedSubscriptionSet);
}

37
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java

@ -33,6 +33,7 @@ import java.util.Collection; @@ -33,6 +33,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA;
@ -44,6 +45,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_ @@ -44,6 +45,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_
import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -51,46 +53,64 @@ public class ConsumerProtocolTest { @@ -51,46 +53,64 @@ public class ConsumerProtocolTest {
private final TopicPartition tp1 = new TopicPartition("foo", 1);
private final TopicPartition tp2 = new TopicPartition("bar", 2);
private final Optional<String> groupInstanceId = Optional.of("instance.id");
@Test
public void serializeDeserializeMetadata() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertEquals(0, parsedSubscription.userData().limit());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
}
@Test
public void serializeDeserializeMetadataAndGroupInstanceId() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
assertEquals(subscription.topics(), parsedSubscription.topics());
assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@Test
public void serializeDeserializeNullSubscriptionUserData() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
}
@Test
public void deserializeOldSubscriptionVersion() {
Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@Test
public void deserializeNewSubscriptionWithOldVersion() {
Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
Subscription subscription = new Subscription((short) 1,
Arrays.asList("foo", "bar"),
null, Collections.singletonList(tp2),
Optional.empty());
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
// ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
header.getShort(VERSION_KEY_NAME);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer, Optional.empty());
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(parsedSubscription.userData());
assertTrue(parsedSubscription.ownedPartitions().isEmpty());
assertFalse(parsedSubscription.groupInstanceId().isPresent());
}
@Test
@ -121,9 +141,10 @@ public class ConsumerProtocolTest { @@ -121,9 +141,10 @@ public class ConsumerProtocolTest {
buffer.flip();
Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(Collections.singletonList("topic"), subscription.topics());
assertEquals(Collections.singletonList(tp2), subscription.ownedPartitions());
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId);
assertEquals(Collections.singletonList("topic"), parsedSubscription.topics());
assertEquals(Collections.singletonList(tp2), parsedSubscription.ownedPartitions());
assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
}
@Test

Loading…
Cancel
Save