From 24f664aa1621dc70794fd6576ac99547e41d2113 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Tue, 28 May 2019 11:22:09 -0700 Subject: [PATCH] MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812) Authorized operations must be null when talking to a pre-KIP-430 broker. If we present this as the empty set instead, it is impossible for clients to know if they have no permissions, or are talking to an old broker. Reviewers: Manikumar Reddy --- .../admin/ConsumerGroupDescription.java | 2 +- .../clients/admin/DescribeClusterOptions.java | 4 + .../clients/admin/DescribeClusterResult.java | 3 +- .../kafka/clients/admin/KafkaAdminClient.java | 5 +- .../kafka/clients/admin/TopicDescription.java | 4 +- .../apache/kafka/common/acl/AclOperation.java | 3 + .../requests/DescribeGroupsResponse.java | 19 ++++ .../common/requests/MetadataResponse.java | 5 +- .../message/DescribeGroupsResponse.json | 2 +- .../common/message/MetadataResponse.json | 4 +- .../clients/admin/KafkaAdminClientTest.java | 96 ++++++++++++++++++- 11 files changed, 137 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 52f23ed116b..4590c74d6f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -128,7 +128,7 @@ public class ConsumerGroupDescription { } /** - * authorizedOperations for this group + * authorizedOperations for this group, or null if that information is not known. */ public Set authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java index abde1549205..7fb7bd1e10f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java @@ -45,6 +45,10 @@ public class DescribeClusterOptions extends AbstractOptions> authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e612593a73d..9b37f5a51df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1580,7 +1580,7 @@ public class KafkaAdminClient extends AdminClient { controllerFuture.complete(controller(response)); clusterIdFuture.complete(response.clusterId()); authorizedOperationsFuture.complete( - validAclOperations(response.data().clusterAuthorizedOperations())); + validAclOperations(response.data().clusterAuthorizedOperations())); } private Node controller(MetadataResponse response) { @@ -2741,6 +2741,9 @@ public class KafkaAdminClient extends AdminClient { } private Set validAclOperations(final int authorizedOperations) { + if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) { + return null; + } return Utils.from32BitField(authorizedOperations) .stream() .map(AclOperation::fromCode) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index c6d44e88cab..ea9bf057c49 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -70,7 +70,7 @@ public class TopicDescription { * @param internal Whether the topic is internal to Kafka * @param partitions A list of partitions where the index represents the partition id and the element contains * leadership and replica information for that partition. - * @param authorizedOperations authorized operations for this topic + * @param authorizedOperations authorized operations for this topic, or null if this is not known. */ TopicDescription(String name, boolean internal, List partitions, Set authorizedOperations) { @@ -104,7 +104,7 @@ public class TopicDescription { } /** - * authorized operations for this topic + * authorized operations for this topic, or null if this is not known. */ public Set authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java index 7befd6e9d36..671069775ca 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java @@ -108,6 +108,9 @@ public enum AclOperation { */ IDEMPOTENT_WRITE((byte) 12); + // Note: we cannot have more than 30 ACL operations without modifying the format used + // to describe ACL operations in MetadataResponse. + private final static HashMap CODE_TO_VALUE = new HashMap<>(); static { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 61b9ea2e1da..823512f857d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -85,6 +85,25 @@ public class DescribeGroupsResponse extends AbstractResponse { return groupMetada; } + public static DescribedGroup groupMetadata( + final String groupId, + final Errors error, + final String state, + final String protocolType, + final String protocol, + final List members, + final int authorizedOperations) { + DescribedGroup groupMetada = new DescribedGroup(); + groupMetada.setGroupId(groupId) + .setErrorCode(error.code()) + .setGroupState(state) + .setProtocolType(protocolType) + .setProtocolData(protocol) + .setMembers(members) + .setAuthorizedOperations(authorizedOperations); + return groupMetada; + } + public DescribeGroupsResponseData data() { return data; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 3455d5b69c5..39b6180eed6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -55,6 +55,8 @@ import java.util.stream.Collectors; public class MetadataResponse extends AbstractResponse { public static final int NO_CONTROLLER_ID = -1; + public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE; + private MetadataResponseData data; public MetadataResponse(MetadataResponseData data) { @@ -421,7 +423,8 @@ public class MetadataResponse extends AbstractResponse { public static MetadataResponse prepareResponse(int throttleTimeMs, List brokers, String clusterId, int controllerId, List topicMetadataList) { - return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList, 0); + return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); } public static MetadataResponse prepareResponse(List brokers, String clusterId, int controllerId, diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json index e0cbc1ec8f5..dd8525a3069 100644 --- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json +++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json @@ -53,7 +53,7 @@ { "name": "MemberAssignment", "type": "bytes", "versions": "0+", "about": "The current assignment provided by the group leader." } ]}, - { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", + { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this group." } ]} ] diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index f54ef2805fb..bb09cdb4973 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -77,10 +77,10 @@ { "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true, "about": "The set of offline replicas of this partition." } ]}, - { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", + { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this topic." } ]}, - { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", + { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this cluster." } ] } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 567d578cc40..e2341fb8401 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -104,6 +104,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -493,11 +494,12 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(), initializedCluster.clusterResource().clusterId(), 1, singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false, - singletonList(partitionMetadata))))); + singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)))); DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic)); Map topicDescriptions = result.all().get(); assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader()); + assertEquals(null, topicDescriptions.get(topic).authorizedOperations()); } } @@ -924,6 +926,61 @@ public class KafkaAdminClientTest { } } + @Test + public void testDescribeCluster() throws Exception { + final HashMap nodes = new HashMap<>(); + Node node0 = new Node(0, "localhost", 8121); + Node node1 = new Node(1, "localhost", 8122); + Node node2 = new Node(2, "localhost", 8123); + Node node3 = new Node(3, "localhost", 8124); + nodes.put(0, node0); + nodes.put(1, node1); + nodes.put(2, node2); + nodes.put(3, node3); + + final Cluster cluster = new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Prepare the metadata response used for the first describe cluster + MetadataResponse response = MetadataResponse.prepareResponse(0, + new ArrayList<>(nodes.values()), + env.cluster().clusterResource().clusterId(), + 2, + Collections.emptyList(), + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); + env.kafkaClient().prepareResponse(response); + + // Prepare the metadata response used for the second describe cluster + MetadataResponse response2 = MetadataResponse.prepareResponse(0, + new ArrayList<>(nodes.values()), + env.cluster().clusterResource().clusterId(), + 3, + Collections.emptyList(), + 1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code()); + env.kafkaClient().prepareResponse(response2); + + // Test DescribeCluster with the authorized operations omitted. + final DescribeClusterResult result = env.adminClient().describeCluster(); + assertEquals(env.cluster().clusterResource().clusterId(), result.clusterId().get()); + assertEquals(2, result.controller().get().id()); + assertEquals(null, result.authorizedOperations().get()); + + // Test DescribeCluster with the authorized operations included. + final DescribeClusterResult result2 = env.adminClient().describeCluster(); + assertEquals(env.cluster().clusterResource().clusterId(), result2.clusterId().get()); + assertEquals(3, result2.controller().get().id()); + assertEquals(new HashSet<>(Arrays.asList(AclOperation.DESCRIBE, AclOperation.ALTER)), + result2.authorizedOperations().get()); + } + } + @Test public void testListConsumerGroups() throws Exception { final HashMap nodes = new HashMap<>(); @@ -1230,6 +1287,43 @@ public class KafkaAdminClientTest { } } + @Test + public void testDescribeConsumerGroupsWithAuthorizedOperationsOmitted() throws Exception { + final HashMap nodes = new HashMap<>(); + nodes.put(0, new Node(0, "localhost", 8121)); + + final Cluster cluster = + new Cluster( + "mockClusterId", + nodes.values(), + Collections.emptyList(), + Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller())); + + DescribeGroupsResponseData data = new DescribeGroupsResponseData(); + data.groups().add(DescribeGroupsResponse.groupMetadata( + "group-0", + Errors.NONE, + "", + ConsumerProtocol.PROTOCOL_TYPE, + "", + Collections.emptyList(), + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)); + + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + + final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0")); + final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get(); + + assertNull(groupDescription.authorizedOperations()); + } + } + @Test public void testDescribeConsumerGroupOffsets() throws Exception { final HashMap nodes = new HashMap<>();