Browse Source

MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812)

Authorized operations must be null when talking to a pre-KIP-430 broker.
If we present this as the empty set instead, it is impossible for clients
to know if they have no permissions, or are talking to an old broker.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
pull/6814/head
Colin Patrick McCabe 6 years ago committed by GitHub
parent
commit
24f664aa16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
  2. 4
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
  3. 3
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
  4. 5
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  5. 4
      clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
  6. 3
      clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
  7. 19
      clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
  8. 5
      clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
  9. 2
      clients/src/main/resources/common/message/DescribeGroupsResponse.json
  10. 4
      clients/src/main/resources/common/message/MetadataResponse.json
  11. 96
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

2
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java

@ -128,7 +128,7 @@ public class ConsumerGroupDescription { @@ -128,7 +128,7 @@ public class ConsumerGroupDescription {
}
/**
* authorizedOperations for this group
* authorizedOperations for this group, or null if that information is not known.
*/
public Set<AclOperation> authorizedOperations() {
return authorizedOperations;

4
clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java

@ -45,6 +45,10 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio @@ -45,6 +45,10 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio
return this;
}
/**
* Specify if authorized operations should be included in the response. Note that some
* older brokers cannot not supply this information even if it is requested.
*/
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}

3
clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java

@ -71,7 +71,8 @@ public class DescribeClusterResult { @@ -71,7 +71,8 @@ public class DescribeClusterResult {
}
/**
* Returns a future which yields authorized operations.
* Returns a future which yields authorized operations. The future value will be non-null if the
* broker supplied this information, and null otherwise.
*/
public KafkaFuture<Set<AclOperation>> authorizedOperations() {
return authorizedOperations;

5
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -1580,7 +1580,7 @@ public class KafkaAdminClient extends AdminClient { @@ -1580,7 +1580,7 @@ public class KafkaAdminClient extends AdminClient {
controllerFuture.complete(controller(response));
clusterIdFuture.complete(response.clusterId());
authorizedOperationsFuture.complete(
validAclOperations(response.data().clusterAuthorizedOperations()));
validAclOperations(response.data().clusterAuthorizedOperations()));
}
private Node controller(MetadataResponse response) {
@ -2741,6 +2741,9 @@ public class KafkaAdminClient extends AdminClient { @@ -2741,6 +2741,9 @@ public class KafkaAdminClient extends AdminClient {
}
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
return null;
}
return Utils.from32BitField(authorizedOperations)
.stream()
.map(AclOperation::fromCode)

4
clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java

@ -70,7 +70,7 @@ public class TopicDescription { @@ -70,7 +70,7 @@ public class TopicDescription {
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
* leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic
* @param authorizedOperations authorized operations for this topic, or null if this is not known.
*/
TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations) {
@ -104,7 +104,7 @@ public class TopicDescription { @@ -104,7 +104,7 @@ public class TopicDescription {
}
/**
* authorized operations for this topic
* authorized operations for this topic, or null if this is not known.
*/
public Set<AclOperation> authorizedOperations() {
return authorizedOperations;

3
clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java

@ -108,6 +108,9 @@ public enum AclOperation { @@ -108,6 +108,9 @@ public enum AclOperation {
*/
IDEMPOTENT_WRITE((byte) 12);
// Note: we cannot have more than 30 ACL operations without modifying the format used
// to describe ACL operations in MetadataResponse.
private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
static {

19
clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java

@ -85,6 +85,25 @@ public class DescribeGroupsResponse extends AbstractResponse { @@ -85,6 +85,25 @@ public class DescribeGroupsResponse extends AbstractResponse {
return groupMetada;
}
public static DescribedGroup groupMetadata(
final String groupId,
final Errors error,
final String state,
final String protocolType,
final String protocol,
final List<DescribedGroupMember> members,
final int authorizedOperations) {
DescribedGroup groupMetada = new DescribedGroup();
groupMetada.setGroupId(groupId)
.setErrorCode(error.code())
.setGroupState(state)
.setProtocolType(protocolType)
.setProtocolData(protocol)
.setMembers(members)
.setAuthorizedOperations(authorizedOperations);
return groupMetada;
}
public DescribeGroupsResponseData data() {
return data;
}

5
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java

@ -55,6 +55,8 @@ import java.util.stream.Collectors; @@ -55,6 +55,8 @@ import java.util.stream.Collectors;
public class MetadataResponse extends AbstractResponse {
public static final int NO_CONTROLLER_ID = -1;
public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
private MetadataResponseData data;
public MetadataResponse(MetadataResponseData data) {
@ -421,7 +423,8 @@ public class MetadataResponse extends AbstractResponse { @@ -421,7 +423,8 @@ public class MetadataResponse extends AbstractResponse {
public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
int controllerId, List<TopicMetadata> topicMetadataList) {
return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList, 0);
return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList,
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
}
public static MetadataResponse prepareResponse(List<Node> brokers, String clusterId, int controllerId,

2
clients/src/main/resources/common/message/DescribeGroupsResponse.json

@ -53,7 +53,7 @@ @@ -53,7 +53,7 @@
{ "name": "MemberAssignment", "type": "bytes", "versions": "0+",
"about": "The current assignment provided by the group leader." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "3+",
{ "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
]}
]

4
clients/src/main/resources/common/message/MetadataResponse.json

@ -77,10 +77,10 @@ @@ -77,10 +77,10 @@
{ "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true,
"about": "The set of offline replicas of this partition." }
]},
{ "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+",
{ "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this topic." }
]},
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+",
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this cluster." }
]
}

96
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -104,6 +104,7 @@ import org.slf4j.LoggerFactory; @@ -104,6 +104,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -493,11 +494,12 @@ public class KafkaAdminClientTest { @@ -493,11 +494,12 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
initializedCluster.clusterResource().clusterId(), 1,
singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
singletonList(partitionMetadata)))));
singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED))));
DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic));
Map<String, TopicDescription> topicDescriptions = result.all().get();
assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());
assertEquals(null, topicDescriptions.get(topic).authorizedOperations());
}
}
@ -924,6 +926,61 @@ public class KafkaAdminClientTest { @@ -924,6 +926,61 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testDescribeCluster() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
Node node0 = new Node(0, "localhost", 8121);
Node node1 = new Node(1, "localhost", 8122);
Node node2 = new Node(2, "localhost", 8123);
Node node3 = new Node(3, "localhost", 8124);
nodes.put(0, node0);
nodes.put(1, node1);
nodes.put(2, node2);
nodes.put(3, node3);
final Cluster cluster = new Cluster(
"mockClusterId",
nodes.values(),
Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Prepare the metadata response used for the first describe cluster
MetadataResponse response = MetadataResponse.prepareResponse(0,
new ArrayList<>(nodes.values()),
env.cluster().clusterResource().clusterId(),
2,
Collections.emptyList(),
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
env.kafkaClient().prepareResponse(response);
// Prepare the metadata response used for the second describe cluster
MetadataResponse response2 = MetadataResponse.prepareResponse(0,
new ArrayList<>(nodes.values()),
env.cluster().clusterResource().clusterId(),
3,
Collections.emptyList(),
1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code());
env.kafkaClient().prepareResponse(response2);
// Test DescribeCluster with the authorized operations omitted.
final DescribeClusterResult result = env.adminClient().describeCluster();
assertEquals(env.cluster().clusterResource().clusterId(), result.clusterId().get());
assertEquals(2, result.controller().get().id());
assertEquals(null, result.authorizedOperations().get());
// Test DescribeCluster with the authorized operations included.
final DescribeClusterResult result2 = env.adminClient().describeCluster();
assertEquals(env.cluster().clusterResource().clusterId(), result2.clusterId().get());
assertEquals(3, result2.controller().get().id());
assertEquals(new HashSet<>(Arrays.asList(AclOperation.DESCRIBE, AclOperation.ALTER)),
result2.authorizedOperations().get());
}
}
@Test
public void testListConsumerGroups() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
@ -1230,6 +1287,43 @@ public class KafkaAdminClientTest { @@ -1230,6 +1287,43 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testDescribeConsumerGroupsWithAuthorizedOperationsOmitted() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
final Cluster cluster =
new Cluster(
"mockClusterId",
nodes.values(),
Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
DescribeGroupsResponseData data = new DescribeGroupsResponseData();
data.groups().add(DescribeGroupsResponse.groupMetadata(
"group-0",
Errors.NONE,
"",
ConsumerProtocol.PROTOCOL_TYPE,
"",
Collections.emptyList(),
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList("group-0"));
final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get();
assertNull(groupDescription.authorizedOperations());
}
}
@Test
public void testDescribeConsumerGroupOffsets() throws Exception {
final HashMap<Integer, Node> nodes = new HashMap<>();

Loading…
Cancel
Save