diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 52f23ed116b..4590c74d6f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -128,7 +128,7 @@ public class ConsumerGroupDescription { } /** - * authorizedOperations for this group + * authorizedOperations for this group, or null if that information is not known. */ public Set authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java index abde1549205..7fb7bd1e10f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java @@ -45,6 +45,10 @@ public class DescribeClusterOptions extends AbstractOptions> authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e612593a73d..9b37f5a51df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1580,7 +1580,7 @@ public class KafkaAdminClient extends AdminClient { controllerFuture.complete(controller(response)); clusterIdFuture.complete(response.clusterId()); authorizedOperationsFuture.complete( - validAclOperations(response.data().clusterAuthorizedOperations())); + validAclOperations(response.data().clusterAuthorizedOperations())); } private Node controller(MetadataResponse response) { @@ -2741,6 +2741,9 @@ public class KafkaAdminClient extends AdminClient { } private Set validAclOperations(final int authorizedOperations) { + if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) { + return null; + } return Utils.from32BitField(authorizedOperations) .stream() .map(AclOperation::fromCode) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index c6d44e88cab..ea9bf057c49 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -70,7 +70,7 @@ public class TopicDescription { * @param internal Whether the topic is internal to Kafka * @param partitions A list of partitions where the index represents the partition id and the element contains * leadership and replica information for that partition. - * @param authorizedOperations authorized operations for this topic + * @param authorizedOperations authorized operations for this topic, or null if this is not known. */ TopicDescription(String name, boolean internal, List partitions, Set authorizedOperations) { @@ -104,7 +104,7 @@ public class TopicDescription { } /** - * authorized operations for this topic + * authorized operations for this topic, or null if this is not known. */ public Set authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java index 7befd6e9d36..671069775ca 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java @@ -108,6 +108,9 @@ public enum AclOperation { */ IDEMPOTENT_WRITE((byte) 12); + // Note: we cannot have more than 30 ACL operations without modifying the format used + // to describe ACL operations in MetadataResponse. + private final static HashMap CODE_TO_VALUE = new HashMap<>(); static { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 61b9ea2e1da..823512f857d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -85,6 +85,25 @@ public class DescribeGroupsResponse extends AbstractResponse { return groupMetada; } + public static DescribedGroup groupMetadata( + final String groupId, + final Errors error, + final String state, + final String protocolType, + final String protocol, + final List members, + final int authorizedOperations) { + DescribedGroup groupMetada = new DescribedGroup(); + groupMetada.setGroupId(groupId) + .setErrorCode(error.code()) + .setGroupState(state) + .setProtocolType(protocolType) + .setProtocolData(protocol) + .setMembers(members) + .setAuthorizedOperations(authorizedOperations); + return groupMetada; + } + public DescribeGroupsResponseData data() { return data; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 3455d5b69c5..39b6180eed6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -55,6 +55,8 @@ import java.util.stream.Collectors; public class MetadataResponse extends AbstractResponse { public static final int NO_CONTROLLER_ID = -1; + public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE; + private MetadataResponseData data; public MetadataResponse(MetadataResponseData data) { @@ -421,7 +423,8 @@ public class MetadataResponse extends AbstractResponse { public static MetadataResponse prepareResponse(int throttleTimeMs, List brokers, String clusterId, int controllerId, List topicMetadataList) { - return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList, 0); + return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); } public static MetadataResponse prepareResponse(List brokers, String clusterId, int controllerId, diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json index e0cbc1ec8f5..dd8525a3069 100644 --- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json +++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json @@ -53,7 +53,7 @@ { "name": "MemberAssignment", "type": "bytes", "versions": "0+", "about": "The current assignment provided by the group leader." } ]}, - { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", + { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this group." } ]} ] diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index f54ef2805fb..bb09cdb4973 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -77,10 +77,10 @@ { "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true, "about": "The set of offline replicas of this partition." } ]}, - { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", + { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this topic." } ]}, - { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", + { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this cluster." } ] } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 567d578cc40..e2341fb8401 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -104,6 +104,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -493,11 +494,12 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(), initializedCluster.clusterResource().clusterId(), 1, singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false, - singletonList(partitionMetadata))))); + singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)))); DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic)); Map topicDescriptions = result.all().get(); assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader()); + assertEquals(null, topicDescriptions.get(topic).authorizedOperations()); } } @@ -924,6 +926,61 @@ public class KafkaAdminClientTest { } } + @Test + public void testDescribeCluster() throws Exception { + final HashMap nodes = new HashMap<>(); + Node node0 = new Node(0, "localhost", 8121); + Node node1 = new Node(1, "localhost", 8122); + Node node2 = new Node(2, "localhost", 8123); + Node node3 = new Node(3, "localhost", 8124); + nodes.put(0, node0); + nodes.put(1, node1); + nodes.put(2, node2); + nodes.put(3, node3); + + final Cluster cluster = new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Prepare the metadata response used for the first describe cluster + MetadataResponse response = MetadataResponse.prepareResponse(0, + new ArrayList<>(nodes.values()), + env.cluster().clusterResource().clusterId(), + 2, + Collections.emptyList(), + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); + env.kafkaClient().prepareResponse(response); + + // Prepare the metadata response used for the second describe cluster + MetadataResponse response2 = MetadataResponse.prepareResponse(0, + new ArrayList<>(nodes.values()), + env.cluster().clusterResource().clusterId(), + 3, + Collections.emptyList(), + 1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code()); + env.kafkaClient().prepareResponse(response2); + + // Test DescribeCluster with the authorized operations omitted. + final DescribeClusterResult result = env.adminClient().describeCluster(); + assertEquals(env.cluster().clusterResource().clusterId(), result.clusterId().get()); + assertEquals(2, result.controller().get().id()); + assertEquals(null, result.authorizedOperations().get()); + + // Test DescribeCluster with the authorized operations included. + final DescribeClusterResult result2 = env.adminClient().describeCluster(); + assertEquals(env.cluster().clusterResource().clusterId(), result2.clusterId().get()); + assertEquals(3, result2.controller().get().id()); + assertEquals(new HashSet<>(Arrays.asList(AclOperation.DESCRIBE, AclOperation.ALTER)), + result2.authorizedOperations().get()); + } + } + @Test public void testListConsumerGroups() throws Exception { final HashMap nodes = new HashMap<>(); @@ -1230,6 +1287,43 @@ public class KafkaAdminClientTest { } } + @Test + public void testDescribeConsumerGroupsWithAuthorizedOperationsOmitted() throws Exception { + final HashMap nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller())); + + DescribeGroupsResponseData data = new DescribeGroupsResponseData(); + data.groups().add(DescribeGroupsResponse.groupMetadata( + "group-0", + Errors.NONE, + "", + ConsumerProtocol.PROTOCOL_TYPE, + "", + Collections.emptyList(), + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)); + + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + + final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0")); + final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get(); + + assertNull(groupDescription.authorizedOperations()); + } + } + @Test public void testDescribeConsumerGroupOffsets() throws Exception { final HashMap nodes = new HashMap<>();