Browse Source

KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values() (#9007)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, David Jacot <djacot@confluent.io>, Lee Dongjin <dongjin@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
pull/9091/head
Tom Bentley 4 years ago committed by GitHub
parent
commit
819cd454f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 84
      clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java
  2. 53
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  3. 64
      clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
  4. 70
      clients/src/main/java/org/apache/kafka/clients/admin/ReplicaInfo.java
  5. 33
      clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java
  6. 263
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  7. 9
      core/src/main/scala/kafka/admin/LogDirsCommand.scala
  8. 2
      core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
  9. 2
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  10. 2
      core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
  11. 26
      core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
  12. 15
      streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java

84
clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java

@ -23,7 +23,11 @@ import java.util.HashMap; @@ -23,7 +23,11 @@ import java.util.HashMap;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
/**
@ -33,38 +37,82 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo; @@ -33,38 +37,82 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
*/
@InterfaceStability.Evolving
public class DescribeLogDirsResult {
private final Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> futures;
private final Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> futures;
DescribeLogDirsResult(Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> futures) {
DescribeLogDirsResult(Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> futures) {
this.futures = futures;
}
/**
* Return a map from brokerId to future which can be used to check the information of partitions on each individual broker
* Return a map from brokerId to future which can be used to check the information of partitions on each individual broker.
* @deprecated Deprecated Since Kafka 2.7. Use {@link #descriptions()}.
*/
public Map<Integer, KafkaFuture<Map<String, LogDirInfo>>> values() {
@Deprecated
@SuppressWarnings("deprecation")
public Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> values() {
return descriptions().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().thenApply(map -> convertMapValues(map))));
}
@SuppressWarnings("deprecation")
private Map<String, DescribeLogDirsResponse.LogDirInfo> convertMapValues(Map<String, LogDirDescription> map) {
Stream<Map.Entry<String, LogDirDescription>> stream = map.entrySet().stream();
return stream.collect(Collectors.toMap(
Map.Entry::getKey,
infoEntry -> {
LogDirDescription logDir = infoEntry.getValue();
return new DescribeLogDirsResponse.LogDirInfo(logDir.error() == null ? Errors.NONE : Errors.forException(logDir.error()),
logDir.replicaInfos().entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
replicaEntry -> new DescribeLogDirsResponse.ReplicaInfo(
replicaEntry.getValue().size(),
replicaEntry.getValue().offsetLag(),
replicaEntry.getValue().isFuture())
)));
}));
}
/**
* Return a map from brokerId to future which can be used to check the information of partitions on each individual broker.
* The result of the future is a map from broker log directory path to a description of that log directory.
*/
public Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions() {
return futures;
}
/**
* Return a future which succeeds only if all the brokers have responded without error
* @deprecated Deprecated Since Kafka 2.7. Use {@link #allDescriptions()}.
*/
@Deprecated
@SuppressWarnings("deprecation")
public KafkaFuture<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> all() {
return allDescriptions().thenApply(map -> map.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey(),
entry -> convertMapValues(entry.getValue())
)));
}
/**
* Return a future which succeeds only if all the brokers have responded without error.
* The result of the future is a map from brokerId to a map from broker log directory path
* to a description of that log directory.
*/
public KafkaFuture<Map<Integer, Map<String, LogDirInfo>>> all() {
public KafkaFuture<Map<Integer, Map<String, LogDirDescription>>> allDescriptions() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
thenApply(new KafkaFuture.BaseFunction<Void, Map<Integer, Map<String, LogDirInfo>>>() {
@Override
public Map<Integer, Map<String, LogDirInfo>> apply(Void v) {
Map<Integer, Map<String, LogDirInfo>> descriptions = new HashMap<>(futures.size());
for (Map.Entry<Integer, KafkaFuture<Map<String, LogDirInfo>>> entry : futures.entrySet()) {
try {
descriptions.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures completed successfully.
throw new RuntimeException(e);
}
thenApply(v -> {
Map<Integer, Map<String, LogDirDescription>> descriptions = new HashMap<>(futures.size());
for (Map.Entry<Integer, KafkaFuture<Map<String, LogDirDescription>>> entry : futures.entrySet()) {
try {
descriptions.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures completed successfully.
throw new RuntimeException(e);
}
return descriptions;
}
return descriptions;
});
}
}

53
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.DisconnectException; @@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
@ -110,6 +111,7 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup @@ -110,6 +111,7 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
@ -2311,11 +2313,11 @@ public class KafkaAdminClient extends AdminClient { @@ -2311,11 +2313,11 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) {
final Map<Integer, KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>> futures = new HashMap<>(brokers.size());
final Map<Integer, KafkaFutureImpl<Map<String, LogDirDescription>>> futures = new HashMap<>(brokers.size());
final long now = time.milliseconds();
for (final Integer brokerId : brokers) {
KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>> future = new KafkaFutureImpl<>();
KafkaFutureImpl<Map<String, LogDirDescription>> future = new KafkaFutureImpl<>();
futures.put(brokerId, future);
runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()),
@ -2327,13 +2329,15 @@ public class KafkaAdminClient extends AdminClient { @@ -2327,13 +2329,15 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null));
}
@SuppressWarnings("deprecation")
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
if (response.logDirInfos().size() > 0) {
future.complete(response.logDirInfos());
Map<String, LogDirDescription> descriptions = logDirDescriptions(response);
if (descriptions.size() > 0) {
future.complete(descriptions);
} else {
// response.logDirInfos() will be empty if and only if the user is not authorized to describe clsuter resource.
// descriptions will be empty if and only if the user is not authorized to describe cluster resource.
future.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
}
}
@ -2347,6 +2351,22 @@ public class KafkaAdminClient extends AdminClient { @@ -2347,6 +2351,22 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeLogDirsResult(new HashMap<>(futures));
}
private static Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirsResponse response) {
Map<String, LogDirDescription> result = new HashMap<>(response.data().results().size());
for (DescribeLogDirsResponseData.DescribeLogDirsResult logDirResult : response.data().results()) {
Map<TopicPartition, ReplicaInfo> replicaInfoMap = new HashMap<>();
for (DescribeLogDirsResponseData.DescribeLogDirsTopic t : logDirResult.topics()) {
for (DescribeLogDirsResponseData.DescribeLogDirsPartition p : t.partitions()) {
replicaInfoMap.put(
new TopicPartition(t.name(), p.partitionIndex()),
new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey()));
}
}
result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap));
}
return result;
}
@Override
public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options) {
final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> futures = new HashMap<>(replicas.size());
@ -2395,32 +2415,31 @@ public class KafkaAdminClient extends AdminClient { @@ -2395,32 +2415,31 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> responseEntry: response.logDirInfos().entrySet()) {
for (Map.Entry<String, LogDirDescription> responseEntry: logDirDescriptions(response).entrySet()) {
String logDir = responseEntry.getKey();
DescribeLogDirsResponse.LogDirInfo logDirInfo = responseEntry.getValue();
LogDirDescription logDirInfo = responseEntry.getValue();
// No replica info will be provided if the log directory is offline
if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR)
if (logDirInfo.error() instanceof KafkaStorageException)
continue;
if (logDirInfo.error != Errors.NONE)
if (logDirInfo.error() != null)
handleFailure(new IllegalStateException(
"The error " + logDirInfo.error + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
"The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal"));
for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos.entrySet()) {
for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) {
TopicPartition tp = replicaInfoEntry.getKey();
DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
if (replicaLogDirInfo == null) {
handleFailure(new IllegalStateException(
"The partition " + tp + " in the response from broker " + brokerId + " is not in the request"));
} else if (replicaInfo.isFuture) {
log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp);
} else if (replicaInfo.isFuture()) {
replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
logDir,
replicaInfo.offsetLag));
replicaInfo.offsetLag()));
} else {
replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir,
replicaInfo.offsetLag,
replicaInfo.offsetLag(),
replicaLogDirInfo.getFutureReplicaLogDir(),
replicaLogDirInfo.getFutureReplicaOffsetLag()));
}

64
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java

@ -0,0 +1,64 @@ @@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import java.util.Map;
import static java.util.Collections.unmodifiableMap;
/**
* A description of a log directory on a particular broker.
*/
public class LogDirDescription {
private final Map<TopicPartition, ReplicaInfo> replicaInfos;
private final ApiException error;
public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
this.error = error;
this.replicaInfos = replicaInfos;
}
/**
* Returns `ApiException` if the log directory is offline or an error occurred, otherwise returns null.
* <ul>
* <li> KafkaStorageException - The log directory is offline.
* <li> UnknownServerException - The server experienced an unexpected error when processing the request.
* </ul>
*/
public ApiException error() {
return error;
}
/**
* A map from topic partition to replica information for that partition
* in this log directory.
*/
public Map<TopicPartition, ReplicaInfo> replicaInfos() {
return unmodifiableMap(replicaInfos);
}
@Override
public String toString() {
return "LogDirDescription(" +
"replicaInfos=" + replicaInfos +
", error=" + error +
')';
}
}

70
clients/src/main/java/org/apache/kafka/clients/admin/ReplicaInfo.java

@ -0,0 +1,70 @@ @@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
/**
* A description of a replica on a particular broker.
*/
public class ReplicaInfo {
private final long size;
private final long offsetLag;
private final boolean isFuture;
public ReplicaInfo(long size, long offsetLag, boolean isFuture) {
this.size = size;
this.offsetLag = offsetLag;
this.isFuture = isFuture;
}
/**
* The total size of the log segments in this replica in bytes.
*/
public long size() {
return size;
}
/**
* The lag of the log's LEO with respect to the partition's
* high watermark (if it is the current log for the partition)
* or the current replica's LEO (if it is the {@linkplain #isFuture() future log}
* for the partition).
*/
public long offsetLag() {
return offsetLag;
}
/**
* Whether this replica has been created by a AlterReplicaLogDirsRequest
* but not yet replaced the current replica on the broker.
*
* @return true if this log is created by AlterReplicaLogDirsRequest and will replace the current log
* of the replica at some time in the future.
*/
public boolean isFuture() {
return isFuture;
}
@Override
public String toString() {
return "ReplicaInfo(" +
"size=" + size +
", offsetLag=" + offsetLag +
", isFuture=" + isFuture +
')';
}
}

33
clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java

@ -19,9 +19,6 @@ package org.apache.kafka.common.requests; @@ -19,9 +19,6 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsPartition;
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsResult;
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@ -68,22 +65,6 @@ public class DescribeLogDirsResponse extends AbstractResponse { @@ -68,22 +65,6 @@ public class DescribeLogDirsResponse extends AbstractResponse {
return errorCounts;
}
public Map<String, LogDirInfo> logDirInfos() {
HashMap<String, LogDirInfo> result = new HashMap<>(data.results().size());
for (DescribeLogDirsResult logDirResult : data.results()) {
Map<TopicPartition, ReplicaInfo> replicaInfoMap = new HashMap<>();
for (DescribeLogDirsTopic t : logDirResult.topics()) {
for (DescribeLogDirsPartition p : t.partitions()) {
replicaInfoMap.put(
new TopicPartition(t.name(), p.partitionIndex()),
new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey()));
}
}
result.put(logDirResult.logDir(), new LogDirInfo(Errors.forCode(logDirResult.errorCode()), replicaInfoMap));
}
return result;
}
public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) {
return new DescribeLogDirsResponse(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version).read(buffer), version);
}
@ -94,7 +75,13 @@ public class DescribeLogDirsResponse extends AbstractResponse { @@ -94,7 +75,13 @@ public class DescribeLogDirsResponse extends AbstractResponse {
*
* KAFKA_STORAGE_ERROR (56)
* UNKNOWN (-1)
*
* @deprecated Deprecated Since Kafka 2.7.
* Use {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()}
* and {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to access the replacement
* class {@link org.apache.kafka.clients.admin.LogDirDescription}.
*/
@Deprecated
static public class LogDirInfo {
public final Errors error;
public final Map<TopicPartition, ReplicaInfo> replicaInfos;
@ -117,6 +104,14 @@ public class DescribeLogDirsResponse extends AbstractResponse { @@ -117,6 +104,14 @@ public class DescribeLogDirsResponse extends AbstractResponse {
}
// Note this class is part of the public API, reachable from Admin.describeLogDirs()
/**
* @deprecated Deprecated Since Kafka 2.7.
* Use {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()}
* and {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to access the replacement
* class {@link org.apache.kafka.clients.admin.ReplicaInfo}.
*/
@Deprecated
static public class ReplicaInfo {
public final long size;

263
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -89,6 +89,7 @@ import org.apache.kafka.common.message.DescribeConfigsResponseData; @@ -89,6 +89,7 @@ import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
@ -182,6 +183,7 @@ import java.util.stream.Stream; @@ -182,6 +183,7 @@ import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
@ -1350,6 +1352,263 @@ public class KafkaAdminClientTest { @@ -1350,6 +1352,263 @@ public class KafkaAdminClientTest {
}
}
private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
return prepareDescribeLogDirsResponse(error, logDir,
prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false));
}
private static List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) {
return singletonList(new DescribeLogDirsTopic()
.setName(topic)
.setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition()
.setPartitionIndex(partition)
.setPartitionSize(partitionSize)
.setIsFutureKey(isFuture)
.setOffsetLag(offsetLag))));
}
private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir,
List<DescribeLogDirsTopic> topics) {
return new DescribeLogDirsResponse(
new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
.setErrorCode(error.code())
.setLogDir(logDir)
.setTopics(topics)
)));
}
@Test
public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
Set<Integer> brokers = Collections.singleton(0);
String logDir = "/var/data/kafka";
TopicPartition tp = new TopicPartition("topic", 12);
long partitionSize = 1234567890;
long offsetLag = 24;
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, partitionSize, offsetLag),
env.cluster().nodeById(0));
DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
assertEquals(brokers, descriptions.keySet());
assertNotNull(descriptions.get(0));
assertDescriptionContains(descriptions.get(0).get(), logDir, tp, partitionSize, offsetLag);
Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
assertEquals(brokers, allDescriptions.keySet());
assertDescriptionContains(allDescriptions.get(0), logDir, tp, partitionSize, offsetLag);
}
}
private static void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir,
TopicPartition tp, long partitionSize, long offsetLag) {
assertNotNull(descriptionsMap);
assertEquals(Collections.singleton(logDir), descriptionsMap.keySet());
assertNull(descriptionsMap.get(logDir).error());
Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get(logDir).replicaInfos();
assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet());
assertEquals(partitionSize, descriptionsReplicaInfos.get(tp).size());
assertEquals(offsetLag, descriptionsReplicaInfos.get(tp).offsetLag());
assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
}
@SuppressWarnings("deprecation")
@Test
public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException {
Set<Integer> brokers = Collections.singleton(0);
TopicPartition tp = new TopicPartition("topic", 12);
String logDir = "/var/data/kafka";
Errors error = Errors.NONE;
int offsetLag = 24;
long partitionSize = 1234567890;
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(error, logDir, tp, partitionSize, offsetLag),
env.cluster().nodeById(0));
DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
assertEquals(brokers, deprecatedValues.keySet());
assertNotNull(deprecatedValues.get(0));
assertDescriptionContains(deprecatedValues.get(0).get(), logDir, tp, error, offsetLag, partitionSize);
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
assertEquals(brokers, deprecatedAll.keySet());
assertDescriptionContains(deprecatedAll.get(0), logDir, tp, error, offsetLag, partitionSize);
}
}
@SuppressWarnings("deprecation")
private static void assertDescriptionContains(Map<String, DescribeLogDirsResponse.LogDirInfo> descriptionsMap,
String logDir, TopicPartition tp, Errors error,
int offsetLag, long partitionSize) {
assertNotNull(descriptionsMap);
assertEquals(Collections.singleton(logDir), descriptionsMap.keySet());
assertEquals(error, descriptionsMap.get(logDir).error);
Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> allReplicaInfos =
descriptionsMap.get(logDir).replicaInfos;
assertEquals(Collections.singleton(tp), allReplicaInfos.keySet());
assertEquals(partitionSize, allReplicaInfos.get(tp).size);
assertEquals(offsetLag, allReplicaInfos.get(tp).offsetLag);
assertFalse(allReplicaInfos.get(tp).isFuture);
}
@Test
public void testDescribeLogDirsOfflineDir() throws ExecutionException, InterruptedException {
Set<Integer> brokers = Collections.singleton(0);
String logDir = "/var/data/kafka";
Errors error = Errors.KAFKA_STORAGE_ERROR;
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(error, logDir, emptyList()),
env.cluster().nodeById(0));
DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
assertEquals(brokers, descriptions.keySet());
assertNotNull(descriptions.get(0));
Map<String, LogDirDescription> descriptionsMap = descriptions.get(0).get();
assertEquals(Collections.singleton(logDir), descriptionsMap.keySet());
assertEquals(error.exception().getClass(), descriptionsMap.get(logDir).error().getClass());
assertEquals(emptySet(), descriptionsMap.get(logDir).replicaInfos().keySet());
Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
assertEquals(brokers, allDescriptions.keySet());
Map<String, LogDirDescription> allMap = allDescriptions.get(0);
assertNotNull(allMap);
assertEquals(Collections.singleton(logDir), allMap.keySet());
assertEquals(error.exception().getClass(), allMap.get(logDir).error().getClass());
assertEquals(emptySet(), allMap.get(logDir).replicaInfos().keySet());
}
}
@SuppressWarnings("deprecation")
@Test
public void testDescribeLogDirsOfflineDirDeprecated() throws ExecutionException, InterruptedException {
Set<Integer> brokers = Collections.singleton(0);
String logDir = "/var/data/kafka";
Errors error = Errors.KAFKA_STORAGE_ERROR;
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(error, logDir, emptyList()),
env.cluster().nodeById(0));
DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);
Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
assertEquals(brokers, deprecatedValues.keySet());
assertNotNull(deprecatedValues.get(0));
Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get();
assertEquals(Collections.singleton(logDir), valuesMap.keySet());
assertEquals(error, valuesMap.get(logDir).error);
assertEquals(emptySet(), valuesMap.get(logDir).replicaInfos.keySet());
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
assertEquals(brokers, deprecatedAll.keySet());
Map<String, DescribeLogDirsResponse.LogDirInfo> allMap = deprecatedAll.get(0);
assertNotNull(allMap);
assertEquals(Collections.singleton(logDir), allMap.keySet());
assertEquals(error, allMap.get(logDir).error);
assertEquals(emptySet(), allMap.get(logDir).replicaInfos.keySet());
}
}
@Test
public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException {
TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 12, 1);
TopicPartitionReplica tpr2 = new TopicPartitionReplica("topic", 12, 2);
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
String broker1log0 = "/var/data/kafka0";
String broker1log1 = "/var/data/kafka1";
String broker2log0 = "/var/data/kafka2";
int broker1Log0OffsetLag = 24;
int broker1Log0PartitionSize = 987654321;
int broker1Log1PartitionSize = 123456789;
int broker1Log1OffsetLag = 4321;
env.kafkaClient().prepareResponseFrom(
new DescribeLogDirsResponse(
new DescribeLogDirsResponseData().setResults(asList(
prepareDescribeLogDirsResult(tpr1, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false),
prepareDescribeLogDirsResult(tpr1, broker1log1, broker1Log1PartitionSize, broker1Log1OffsetLag, true)))),
env.cluster().nodeById(tpr1.brokerId()));
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(Errors.KAFKA_STORAGE_ERROR, broker2log0),
env.cluster().nodeById(tpr2.brokerId()));
DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(tpr1, tpr2));
Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values();
assertEquals(TestUtils.toSet(asList(tpr1, tpr2)), values.keySet());
assertNotNull(values.get(tpr1));
assertEquals(broker1log0, values.get(tpr1).get().getCurrentReplicaLogDir());
assertEquals(broker1Log0OffsetLag, values.get(tpr1).get().getCurrentReplicaOffsetLag());
assertEquals(broker1log1, values.get(tpr1).get().getFutureReplicaLogDir());
assertEquals(broker1Log1OffsetLag, values.get(tpr1).get().getFutureReplicaOffsetLag());
assertNotNull(values.get(tpr2));
assertNull(values.get(tpr2).get().getCurrentReplicaLogDir());
assertEquals(-1, values.get(tpr2).get().getCurrentReplicaOffsetLag());
assertNull(values.get(tpr2).get().getFutureReplicaLogDir());
assertEquals(-1, values.get(tpr2).get().getFutureReplicaOffsetLag());
}
}
private static DescribeLogDirsResponseData.DescribeLogDirsResult prepareDescribeLogDirsResult(TopicPartitionReplica tpr, String logDir, int partitionSize, int offsetLag, boolean isFuture) {
return new DescribeLogDirsResponseData.DescribeLogDirsResult()
.setErrorCode(Errors.NONE.code())
.setLogDir(logDir)
.setTopics(prepareDescribeLogDirsTopics(partitionSize, offsetLag, tpr.topic(), tpr.partition(), isFuture));
}
@Test
public void testDescribeReplicaLogDirsUnexpected() throws ExecutionException, InterruptedException {
TopicPartitionReplica expected = new TopicPartitionReplica("topic", 12, 1);
TopicPartitionReplica unexpected = new TopicPartitionReplica("topic", 12, 2);
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
String broker1log0 = "/var/data/kafka0";
String broker1log1 = "/var/data/kafka1";
int broker1Log0PartitionSize = 987654321;
int broker1Log0OffsetLag = 24;
int broker1Log1PartitionSize = 123456789;
int broker1Log1OffsetLag = 4321;
env.kafkaClient().prepareResponseFrom(
new DescribeLogDirsResponse(
new DescribeLogDirsResponseData().setResults(asList(
prepareDescribeLogDirsResult(expected, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false),
prepareDescribeLogDirsResult(unexpected, broker1log1, broker1Log1PartitionSize, broker1Log1OffsetLag, true)))),
env.cluster().nodeById(expected.brokerId()));
DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(expected));
Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values();
assertEquals(TestUtils.toSet(asList(expected)), values.keySet());
assertNotNull(values.get(expected));
assertEquals(broker1log0, values.get(expected).get().getCurrentReplicaLogDir());
assertEquals(broker1Log0OffsetLag, values.get(expected).get().getCurrentReplicaOffsetLag());
assertEquals(broker1log1, values.get(expected).get().getFutureReplicaLogDir());
assertEquals(broker1Log1OffsetLag, values.get(expected).get().getFutureReplicaOffsetLag());
}
}
@Test
public void testCreatePartitions() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@ -4187,8 +4446,8 @@ public class KafkaAdminClientTest { @@ -4187,8 +4446,8 @@ public class KafkaAdminClientTest {
DescribeLogDirsResult result = env.adminClient().describeLogDirs(Arrays.asList(0, 1));
TestUtils.assertFutureThrows(result.values().get(0), ApiException.class);
assertNotNull(result.values().get(1).get());
TestUtils.assertFutureThrows(result.descriptions().get(0), ApiException.class);
assertNotNull(result.descriptions().get(1).get());
}
}

9
core/src/main/scala/kafka/admin/LogDirsCommand.scala

@ -21,8 +21,7 @@ import java.io.PrintStream @@ -21,8 +21,7 @@ import java.io.PrintStream
import java.util.Properties
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult, LogDirDescription}
import org.apache.kafka.common.utils.Utils
import scala.jdk.CollectionConverters._
@ -48,14 +47,14 @@ object LogDirsCommand { @@ -48,14 +47,14 @@ object LogDirsCommand {
out.println("Querying brokers for log directories information")
val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.map { case (k, v) => k -> v.asScala }
val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
adminClient.close()
}
private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]], topicSet: Set[String]): String = {
private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirDescription]], topicSet: Set[String]): String = {
Json.encodeAsString(Map(
"version" -> 1,
"brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) =>
@ -64,7 +63,7 @@ object LogDirsCommand { @@ -64,7 +63,7 @@ object LogDirsCommand {
"logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
Map(
"logDir" -> logDir,
"error" -> logDirInfo.error.exceptionName(),
"error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull,
"partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) =>
topicSet.isEmpty || topicSet.contains(topicPartition.topic)
}.map { case (topicPartition, replicaInfo) =>

2
core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala

@ -490,7 +490,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { @@ -490,7 +490,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
val logDirs = new mutable.HashSet[String]
val curLogDirs = new mutable.HashMap[TopicPartition, String]
val futureLogDirs = new mutable.HashMap[TopicPartition, String]
result.values.get(brokerId).get().forEach {
result.descriptions.get(brokerId).get().forEach {
case (logDirName, logDirInfo) => {
logDirs.add(logDirName)
logDirInfo.replicaInfos.forEach {

2
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -197,7 +197,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -197,7 +197,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.find(x => x.topicName == tp.topic).get.partitions.asScala
.find(p => p.partitionIndex == tp.partition).get.errorCode)),
ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
if (resp.data.results.size() > 0) Errors.forCode(resp.data.results.get(0).errorCode) else Errors.CLUSTER_AUTHORIZATION_FAILED),
ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => Errors.forCode(resp.data.results.asScala.head.errorCode())),
ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())),
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => {

2
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

@ -176,7 +176,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @@ -176,7 +176,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
k -> v.keys.toSeq
}
val brokers = (0 until brokerCount).map(Integer.valueOf)
val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).allDescriptions.get
(0 until brokerCount).foreach { brokerId =>
val server = servers.find(_.config.brokerId == brokerId).get

26
core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala

@ -27,6 +27,8 @@ import org.apache.kafka.common.requests._ @@ -27,6 +27,8 @@ import org.apache.kafka.common.requests._
import org.junit.Assert._
import org.junit.Test
import scala.jdk.CollectionConverters._
class DescribeLogDirsRequestTest extends BaseRequestTest {
override val logDirCount = 2
override val brokerCount: Int = 1
@ -46,19 +48,25 @@ class DescribeLogDirsRequestTest extends BaseRequestTest { @@ -46,19 +48,25 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
val request = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)).build()
val response = connectAndReceive[DescribeLogDirsResponse](request, destination = controllerSocketServer)
val logDirInfos = response.logDirInfos()
assertEquals(logDirCount, logDirInfos.size())
assertEquals(Errors.KAFKA_STORAGE_ERROR, logDirInfos.get(offlineDir).error)
assertEquals(0, logDirInfos.get(offlineDir).replicaInfos.size())
assertEquals(logDirCount, response.data.results.size)
val offlineResult = response.data.results.asScala.find(logDirResult => logDirResult.logDir == offlineDir).get
assertEquals(Errors.KAFKA_STORAGE_ERROR.code, offlineResult.errorCode)
assertEquals(0, offlineResult.topics.asScala.map(t => t.partitions().size()).sum)
assertEquals(Errors.NONE, logDirInfos.get(onlineDir).error)
val replicaInfo0 = logDirInfos.get(onlineDir).replicaInfos.get(tp0)
val replicaInfo1 = logDirInfos.get(onlineDir).replicaInfos.get(tp1)
val onlineResult = response.data.results.asScala.find(logDirResult => logDirResult.logDir == onlineDir).get
assertEquals(Errors.NONE.code, onlineResult.errorCode)
val onlinePartitionsMap = onlineResult.topics.asScala.flatMap { topic =>
topic.partitions().asScala.map { partitionResult =>
new TopicPartition(topic.name, partitionResult.partitionIndex) -> partitionResult
}
}.toMap
val replicaInfo0 = onlinePartitionsMap(tp0)
val replicaInfo1 = onlinePartitionsMap(tp1)
val log0 = servers.head.logManager.getLog(tp0).get
val log1 = servers.head.logManager.getLog(tp1).get
assertEquals(log0.size, replicaInfo0.size)
assertEquals(log1.size, replicaInfo1.size)
assertEquals(log0.size, replicaInfo0.partitionSize)
assertEquals(log1.size, replicaInfo1.partitionSize)
val logEndOffset = servers.head.logManager.getLog(tp0).get.logEndOffset
assertTrue(s"LogEndOffset '$logEndOffset' should be > 0", logEndOffset > 0)
assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp0, log0.logEndOffset, false), replicaInfo0.offsetLag)

15
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java

@ -18,11 +18,12 @@ package org.apache.kafka.streams.integration; @@ -18,11 +18,12 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
@ -120,13 +121,13 @@ public class PurgeRepartitionTopicIntegrationTest { @@ -120,13 +121,13 @@ public class PurgeRepartitionTopicIntegrationTest {
time.sleep(PURGE_INTERVAL_MS);
try {
final Collection<DescribeLogDirsResponse.LogDirInfo> logDirInfo =
adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values();
final Collection<LogDirDescription> logDirInfo =
adminClient.describeLogDirs(Collections.singleton(0)).descriptions().get(0).get().values();
for (final DescribeLogDirsResponse.LogDirInfo partitionInfo : logDirInfo) {
final DescribeLogDirsResponse.ReplicaInfo replicaInfo =
partitionInfo.replicaInfos.get(new TopicPartition(REPARTITION_TOPIC, 0));
if (replicaInfo != null && verifier.verify(replicaInfo.size)) {
for (final LogDirDescription partitionInfo : logDirInfo) {
final ReplicaInfo replicaInfo =
partitionInfo.replicaInfos().get(new TopicPartition(REPARTITION_TOPIC, 0));
if (replicaInfo != null && verifier.verify(replicaInfo.size())) {
return true;
}
}

Loading…
Cancel
Save