Browse Source

MINOR: Temporarily remove the apiVersions API for 0.11

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3324 from cmccabe/removeApiVersions
pull/3319/merge
Colin P. Mccabe 8 years ago committed by Ismael Juma
parent
commit
32f0f1f50a
  1. 21
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
  2. 37
      clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
  3. 63
      clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResult.java
  4. 35
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  5. 10
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

21
clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
@ -192,26 +191,6 @@ public abstract class AdminClient implements AutoCloseable { @@ -192,26 +191,6 @@ public abstract class AdminClient implements AutoCloseable {
*/
public abstract DescribeClusterResult describeCluster(DescribeClusterOptions options);
/**
* Get information about the api versions of nodes in the cluster with the default options.
* See {@link AdminClient#apiVersions(Collection<Node>, ApiVersionsOptions)}
*
* @param nodes The nodes to get information about, or null to get information about all nodes.
* @return The ApiVersionsResult.
*/
public ApiVersionsResult apiVersions(Collection<Node> nodes) {
return apiVersions(nodes, new ApiVersionsOptions());
}
/**
* Get information about the api versions of nodes in the cluster.
*
* @param nodes The nodes to get information about, or null to get information about all nodes.
* @param options The options to use when getting api versions of the nodes.
* @return The ApiVersionsResult.
*/
public abstract ApiVersionsResult apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
/**
* Similar to #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions)},
* but uses the default options.

37
clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java

@ -1,37 +0,0 @@ @@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Options for the apiVersions call.
*/
@InterfaceStability.Unstable
public class ApiVersionsOptions {
private Integer timeoutMs = null;
public ApiVersionsOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}
public Integer timeoutMs() {
return timeoutMs;
}
}

63
clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResult.java

@ -1,63 +0,0 @@ @@ -1,63 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* Results of the apiVersions call.
*/
@InterfaceStability.Unstable
public class ApiVersionsResult {
private final Map<Node, KafkaFuture<NodeApiVersions>> futures;
ApiVersionsResult(Map<Node, KafkaFuture<NodeApiVersions>> futures) {
this.futures = futures;
}
public Map<Node, KafkaFuture<NodeApiVersions>> results() {
return futures;
}
public KafkaFuture<Map<Node, NodeApiVersions>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
thenApply(new KafkaFuture.Function<Void, Map<Node, NodeApiVersions>>() {
@Override
public Map<Node, NodeApiVersions> apply(Void v) {
Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
for (Map.Entry<Node, KafkaFuture<NodeApiVersions>> entry : futures.entrySet()) {
try {
versions.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures
// completed successfully.
throw new RuntimeException(e);
}
}
return versions;
}
});
}
}

35
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -24,7 +24,6 @@ import org.apache.kafka.clients.ClientUtils; @@ -24,7 +24,6 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.common.Cluster;
@ -59,8 +58,6 @@ import org.apache.kafka.common.requests.AbstractRequest; @@ -59,8 +58,6 @@ import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
import org.apache.kafka.common.requests.CreateAclsResponse;
@ -1263,38 +1260,6 @@ public class KafkaAdminClient extends AdminClient { @@ -1263,38 +1260,6 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeClusterResult(describeClusterFuture, controllerFuture, clusterIdFuture);
}
@Override
public ApiVersionsResult apiVersions(Collection<Node> nodes, ApiVersionsOptions options) {
final long now = time.milliseconds();
final long deadlineMs = calcDeadlineMs(now, options.timeoutMs());
Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>();
for (final Node node : nodes) {
if (nodeFutures.get(node) != null)
continue;
final KafkaFutureImpl<NodeApiVersions> nodeFuture = new KafkaFutureImpl<>();
nodeFutures.put(node, nodeFuture);
runnable.call(new Call("apiVersions", deadlineMs, new ConstantNodeProvider(node)) {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
return new ApiVersionsRequest.Builder();
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
ApiVersionsResponse response = (ApiVersionsResponse) abstractResponse;
nodeFuture.complete(new NodeApiVersions(response.apiVersions()));
}
@Override
public void handleFailure(Throwable throwable) {
nodeFuture.completeExceptionally(throwable);
}
}, now);
}
return new ApiVersionsResult(nodeFutures);
}
@Override
public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) {
final long now = time.milliseconds();

10
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -199,21 +199,19 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { @@ -199,21 +199,19 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
}
@Test
def testGetAllBrokerVersionsAndDescribeCluster(): Unit = {
def testDescribeCluster(): Unit = {
client = AdminClient.create(createConfig())
val nodes = client.describeCluster().nodes().get()
val nodes = client.describeCluster.nodes.get()
val clusterId = client.describeCluster().clusterId().get()
assertEquals(servers.head.apis.clusterId, clusterId)
val controller = client.describeCluster().controller().get()
assertEquals(servers.head.apis.metadataCache.getControllerId.
getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
val nodesToVersions = client.apiVersions(nodes).all().get()
val brokers = brokerList.split(",")
assert(brokers.size == nodesToVersions.size())
for ((node, brokerVersionInfo) <- nodesToVersions.asScala) {
assertEquals(brokers.size, nodes.size)
for (node <- nodes.asScala) {
val hostStr = s"${node.host}:${node.port}"
assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
}
}

Loading…
Cancel
Save