diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index a590a1e794e..b2b6f969843 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -388,9 +388,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { Set allSubscribedTopics = new HashSet<>(); Map subscriptions = new HashMap<>(); - for (JoinGroupResponseData.JoinGroupResponseMember memberSubScription : allSubscriptions) { - Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubScription.metadata())); - subscriptions.put(memberSubScription.memberId(), subscription); + for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) { + Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()), + Optional.ofNullable(memberSubscription.groupInstanceId())); + subscriptions.put(memberSubscription.memberId(), subscription); allSubscribedTopics.addAll(subscription.topics()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index b4ad4514eb6..d3737f79e35 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -27,8 +27,10 @@ import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; @@ -177,17 +179,22 @@ public class ConsumerProtocol { } } - public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) { + public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer, + Optional groupInstanceId) { Struct struct = SUBSCRIPTION_V0.read(buffer); ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); List topics = new ArrayList<>(); for (Object topicObj : struct.getArray(TOPICS_KEY_NAME)) topics.add((String) topicObj); - - return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData); + return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, + topics, + userData, + Collections.emptyList(), + groupInstanceId); } - public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) { + public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer, + Optional groupInstanceId) { Struct struct = SUBSCRIPTION_V1.read(buffer); ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); List topics = new ArrayList<>(); @@ -203,10 +210,15 @@ public class ConsumerProtocol { } } - return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions); + return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, + topics, + userData, + ownedPartitions, + groupInstanceId); } - public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) { + public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer, + Optional groupInstanceId) { Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); Short version = header.getShort(VERSION_KEY_NAME); @@ -215,14 +227,14 @@ public class ConsumerProtocol { switch (version) { case CONSUMER_PROTOCOL_V0: - return deserializeSubscriptionV0(buffer); + return deserializeSubscriptionV0(buffer, groupInstanceId); case CONSUMER_PROTOCOL_V1: - return deserializeSubscriptionV1(buffer); + return deserializeSubscriptionV1(buffer, groupInstanceId); // assume all higher versions can be parsed as V1 default: - return deserializeSubscriptionV1(buffer); + return deserializeSubscriptionV1(buffer, groupInstanceId); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java index 5c76fd66dfe..921a55bd715 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0; @@ -130,12 +131,18 @@ public interface PartitionAssignor { private final List topics; private final ByteBuffer userData; private final List ownedPartitions; + private final Optional groupInstanceId; - Subscription(Short version, List topics, ByteBuffer userData, List ownedPartitions) { + Subscription(Short version, + List topics, + ByteBuffer userData, + List ownedPartitions, + Optional groupInstanceId) { this.version = version; this.topics = topics; this.userData = userData; this.ownedPartitions = ownedPartitions; + this.groupInstanceId = groupInstanceId; if (version < CONSUMER_PROTOCOL_V0) throw new SchemaException("Unsupported subscription version: " + version); @@ -145,11 +152,14 @@ public interface PartitionAssignor { } Subscription(Short version, List topics, ByteBuffer userData) { - this(version, topics, userData, Collections.emptyList()); + this(version, topics, userData, Collections.emptyList(), Optional.empty()); } - public Subscription(List topics, ByteBuffer userData, List ownedPartitions) { - this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions); + public Subscription(List topics, + ByteBuffer userData, + List ownedPartitions, + Optional groupInstanceId) { + this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions, groupInstanceId); } public Subscription(List topics, ByteBuffer userData) { @@ -176,13 +186,17 @@ public interface PartitionAssignor { return userData; } + public Optional groupInstanceId() { + return groupInstanceId; + } + @Override public String toString() { return "Subscription(" + "version=" + version + ", topics=" + topics + ", ownedPartitions=" + ownedPartitions + - ')'; + ", group.instance.id=" + groupInstanceId + ")"; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index c1adf1932ec..cff71c34e45 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1692,7 +1692,7 @@ public class KafkaConsumerTest { assertTrue(protocolIterator.hasNext()); ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata()); - PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata); + PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata, Optional.empty()); return subscribedTopics.equals(new HashSet<>(subscription.topics())); } }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index c54540eccf5..8aeec3c7299 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -600,7 +600,7 @@ public class ConsumerCoordinatorTest { JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next(); ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata()); - PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata); + PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata, Optional.empty()); metadata.rewind(); return subscription.topics().containsAll(updatedSubscriptionSet); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java index 8a8ba0a82e1..07eafa2b079 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA; @@ -44,6 +45,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME; import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -51,46 +53,64 @@ public class ConsumerProtocolTest { private final TopicPartition tp1 = new TopicPartition("foo", 1); private final TopicPartition tp2 = new TopicPartition("bar", 2); + private final Optional groupInstanceId = Optional.of("instance.id"); @Test public void serializeDeserializeMetadata() { Subscription subscription = new Subscription(Arrays.asList("foo", "bar")); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); - Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); + Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty()); assertEquals(subscription.topics(), parsedSubscription.topics()); assertEquals(0, parsedSubscription.userData().limit()); + assertFalse(parsedSubscription.groupInstanceId().isPresent()); + } + + @Test + public void serializeDeserializeMetadataAndGroupInstanceId() { + Subscription subscription = new Subscription(Arrays.asList("foo", "bar")); + ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); + + Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId); + assertEquals(subscription.topics(), parsedSubscription.topics()); + assertEquals(groupInstanceId, parsedSubscription.groupInstanceId()); } @Test public void serializeDeserializeNullSubscriptionUserData() { Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); - Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); + Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, Optional.empty()); assertEquals(subscription.topics(), parsedSubscription.topics()); assertNull(parsedSubscription.userData()); + assertFalse(parsedSubscription.groupInstanceId().isPresent()); } @Test public void deserializeOldSubscriptionVersion() { Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); - Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); + Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId); assertEquals(parsedSubscription.topics(), parsedSubscription.topics()); assertNull(parsedSubscription.userData()); assertTrue(parsedSubscription.ownedPartitions().isEmpty()); + assertEquals(groupInstanceId, parsedSubscription.groupInstanceId()); } @Test public void deserializeNewSubscriptionWithOldVersion() { - Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2)); + Subscription subscription = new Subscription((short) 1, + Arrays.asList("foo", "bar"), + null, Collections.singletonList(tp2), + Optional.empty()); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); // ignore the version assuming it is the old byte code, as it will blindly deserialize as V0 Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); header.getShort(VERSION_KEY_NAME); - Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer); + Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer, Optional.empty()); assertEquals(subscription.topics(), parsedSubscription.topics()); assertNull(parsedSubscription.userData()); assertTrue(parsedSubscription.ownedPartitions().isEmpty()); + assertFalse(parsedSubscription.groupInstanceId().isPresent()); } @Test @@ -121,9 +141,10 @@ public class ConsumerProtocolTest { buffer.flip(); - Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer); - assertEquals(Collections.singletonList("topic"), subscription.topics()); - assertEquals(Collections.singletonList(tp2), subscription.ownedPartitions()); + Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer, groupInstanceId); + assertEquals(Collections.singletonList("topic"), parsedSubscription.topics()); + assertEquals(Collections.singletonList(tp2), parsedSubscription.ownedPartitions()); + assertEquals(groupInstanceId, parsedSubscription.groupInstanceId()); } @Test