diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java index 9bcc2fb29fe..96a81f08b02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java @@ -23,7 +23,11 @@ import java.util.HashMap; import java.util.Collection; import java.util.Map; import java.util.concurrent.ExecutionException; -import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; /** @@ -33,38 +37,82 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo; */ @InterfaceStability.Evolving public class DescribeLogDirsResult { - private final Map>> futures; + private final Map>> futures; - DescribeLogDirsResult(Map>> futures) { + DescribeLogDirsResult(Map>> futures) { this.futures = futures; } /** - * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker + * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker. + * @deprecated Deprecated Since Kafka 2.7. Use {@link #descriptions()}. */ - public Map>> values() { + @Deprecated + @SuppressWarnings("deprecation") + public Map>> values() { + return descriptions().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().thenApply(map -> convertMapValues(map)))); + } + + @SuppressWarnings("deprecation") + private Map convertMapValues(Map map) { + Stream> stream = map.entrySet().stream(); + return stream.collect(Collectors.toMap( + Map.Entry::getKey, + infoEntry -> { + LogDirDescription logDir = infoEntry.getValue(); + return new DescribeLogDirsResponse.LogDirInfo(logDir.error() == null ? Errors.NONE : Errors.forException(logDir.error()), + logDir.replicaInfos().entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + replicaEntry -> new DescribeLogDirsResponse.ReplicaInfo( + replicaEntry.getValue().size(), + replicaEntry.getValue().offsetLag(), + replicaEntry.getValue().isFuture()) + ))); + })); + } + + /** + * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker. + * The result of the future is a map from broker log directory path to a description of that log directory. + */ + public Map>> descriptions() { return futures; } /** * Return a future which succeeds only if all the brokers have responded without error + * @deprecated Deprecated Since Kafka 2.7. Use {@link #allDescriptions()}. + */ + @Deprecated + @SuppressWarnings("deprecation") + public KafkaFuture>> all() { + return allDescriptions().thenApply(map -> map.entrySet().stream().collect(Collectors.toMap( + entry -> entry.getKey(), + entry -> convertMapValues(entry.getValue()) + ))); + } + + /** + * Return a future which succeeds only if all the brokers have responded without error. + * The result of the future is a map from brokerId to a map from broker log directory path + * to a description of that log directory. */ - public KafkaFuture>> all() { + public KafkaFuture>> allDescriptions() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.BaseFunction>>() { - @Override - public Map> apply(Void v) { - Map> descriptions = new HashMap<>(futures.size()); - for (Map.Entry>> entry : futures.entrySet()) { - try { - descriptions.put(entry.getKey(), entry.getValue().get()); - } catch (InterruptedException | ExecutionException e) { - // This should be unreachable, because allOf ensured that all the futures completed successfully. - throw new RuntimeException(e); - } + thenApply(v -> { + Map> descriptions = new HashMap<>(futures.size()); + for (Map.Entry>> entry : futures.entrySet()) { + try { + descriptions.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, because allOf ensured that all the futures completed successfully. + throw new RuntimeException(e); } - return descriptions; } + return descriptions; }); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2603d50e9a7..ebdaccf1dfc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; import org.apache.kafka.common.errors.TimeoutException; @@ -110,6 +111,7 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.DescribeLogDirsRequestData; import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; @@ -2311,11 +2313,11 @@ public class KafkaAdminClient extends AdminClient { @Override public DescribeLogDirsResult describeLogDirs(Collection brokers, DescribeLogDirsOptions options) { - final Map>> futures = new HashMap<>(brokers.size()); + final Map>> futures = new HashMap<>(brokers.size()); final long now = time.milliseconds(); for (final Integer brokerId : brokers) { - KafkaFutureImpl> future = new KafkaFutureImpl<>(); + KafkaFutureImpl> future = new KafkaFutureImpl<>(); futures.put(brokerId, future); runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()), @@ -2327,13 +2329,15 @@ public class KafkaAdminClient extends AdminClient { return new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)); } + @SuppressWarnings("deprecation") @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - if (response.logDirInfos().size() > 0) { - future.complete(response.logDirInfos()); + Map descriptions = logDirDescriptions(response); + if (descriptions.size() > 0) { + future.complete(descriptions); } else { - // response.logDirInfos() will be empty if and only if the user is not authorized to describe clsuter resource. + // descriptions will be empty if and only if the user is not authorized to describe cluster resource. future.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); } } @@ -2347,6 +2351,22 @@ public class KafkaAdminClient extends AdminClient { return new DescribeLogDirsResult(new HashMap<>(futures)); } + private static Map logDirDescriptions(DescribeLogDirsResponse response) { + Map result = new HashMap<>(response.data().results().size()); + for (DescribeLogDirsResponseData.DescribeLogDirsResult logDirResult : response.data().results()) { + Map replicaInfoMap = new HashMap<>(); + for (DescribeLogDirsResponseData.DescribeLogDirsTopic t : logDirResult.topics()) { + for (DescribeLogDirsResponseData.DescribeLogDirsPartition p : t.partitions()) { + replicaInfoMap.put( + new TopicPartition(t.name(), p.partitionIndex()), + new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); + } + } + result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap)); + } + return result; + } + @Override public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection replicas, DescribeReplicaLogDirsOptions options) { final Map> futures = new HashMap<>(replicas.size()); @@ -2395,32 +2415,31 @@ public class KafkaAdminClient extends AdminClient { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - for (Map.Entry responseEntry: response.logDirInfos().entrySet()) { + for (Map.Entry responseEntry: logDirDescriptions(response).entrySet()) { String logDir = responseEntry.getKey(); - DescribeLogDirsResponse.LogDirInfo logDirInfo = responseEntry.getValue(); + LogDirDescription logDirInfo = responseEntry.getValue(); // No replica info will be provided if the log directory is offline - if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR) + if (logDirInfo.error() instanceof KafkaStorageException) continue; - if (logDirInfo.error != Errors.NONE) + if (logDirInfo.error() != null) handleFailure(new IllegalStateException( - "The error " + logDirInfo.error + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); + "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); - for (Map.Entry replicaInfoEntry: logDirInfo.replicaInfos.entrySet()) { + for (Map.Entry replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) { TopicPartition tp = replicaInfoEntry.getKey(); - DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); + ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); if (replicaLogDirInfo == null) { - handleFailure(new IllegalStateException( - "The partition " + tp + " in the response from broker " + brokerId + " is not in the request")); - } else if (replicaInfo.isFuture) { + log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); + } else if (replicaInfo.isFuture()) { replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), replicaLogDirInfo.getCurrentReplicaOffsetLag(), logDir, - replicaInfo.offsetLag)); + replicaInfo.offsetLag())); } else { replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, - replicaInfo.offsetLag, + replicaInfo.offsetLag(), replicaLogDirInfo.getFutureReplicaLogDir(), replicaLogDirInfo.getFutureReplicaOffsetLag())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java new file mode 100644 index 00000000000..1c326ec43b9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; + +import java.util.Map; + +import static java.util.Collections.unmodifiableMap; + +/** + * A description of a log directory on a particular broker. + */ +public class LogDirDescription { + private final Map replicaInfos; + private final ApiException error; + + public LogDirDescription(ApiException error, Map replicaInfos) { + this.error = error; + this.replicaInfos = replicaInfos; + } + + /** + * Returns `ApiException` if the log directory is offline or an error occurred, otherwise returns null. + *
    + *
  • KafkaStorageException - The log directory is offline. + *
  • UnknownServerException - The server experienced an unexpected error when processing the request. + *
+ */ + public ApiException error() { + return error; + } + + /** + * A map from topic partition to replica information for that partition + * in this log directory. + */ + public Map replicaInfos() { + return unmodifiableMap(replicaInfos); + } + + @Override + public String toString() { + return "LogDirDescription(" + + "replicaInfos=" + replicaInfos + + ", error=" + error + + ')'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ReplicaInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/ReplicaInfo.java new file mode 100644 index 00000000000..b77375d5960 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ReplicaInfo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +/** + * A description of a replica on a particular broker. + */ +public class ReplicaInfo { + + private final long size; + private final long offsetLag; + private final boolean isFuture; + + public ReplicaInfo(long size, long offsetLag, boolean isFuture) { + this.size = size; + this.offsetLag = offsetLag; + this.isFuture = isFuture; + } + + /** + * The total size of the log segments in this replica in bytes. + */ + public long size() { + return size; + } + + /** + * The lag of the log's LEO with respect to the partition's + * high watermark (if it is the current log for the partition) + * or the current replica's LEO (if it is the {@linkplain #isFuture() future log} + * for the partition). + */ + public long offsetLag() { + return offsetLag; + } + + /** + * Whether this replica has been created by a AlterReplicaLogDirsRequest + * but not yet replaced the current replica on the broker. + * + * @return true if this log is created by AlterReplicaLogDirsRequest and will replace the current log + * of the replica at some time in the future. + */ + public boolean isFuture() { + return isFuture; + } + + @Override + public String toString() { + return "ReplicaInfo(" + + "size=" + size + + ", offsetLag=" + offsetLag + + ", isFuture=" + isFuture + + ')'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index b744769b43d..e26fc554dcc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -19,9 +19,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DescribeLogDirsResponseData; -import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsPartition; -import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsResult; -import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -68,22 +65,6 @@ public class DescribeLogDirsResponse extends AbstractResponse { return errorCounts; } - public Map logDirInfos() { - HashMap result = new HashMap<>(data.results().size()); - for (DescribeLogDirsResult logDirResult : data.results()) { - Map replicaInfoMap = new HashMap<>(); - for (DescribeLogDirsTopic t : logDirResult.topics()) { - for (DescribeLogDirsPartition p : t.partitions()) { - replicaInfoMap.put( - new TopicPartition(t.name(), p.partitionIndex()), - new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); - } - } - result.put(logDirResult.logDir(), new LogDirInfo(Errors.forCode(logDirResult.errorCode()), replicaInfoMap)); - } - return result; - } - public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) { return new DescribeLogDirsResponse(ApiKeys.DESCRIBE_LOG_DIRS.responseSchema(version).read(buffer), version); } @@ -94,7 +75,13 @@ public class DescribeLogDirsResponse extends AbstractResponse { * * KAFKA_STORAGE_ERROR (56) * UNKNOWN (-1) + * + * @deprecated Deprecated Since Kafka 2.7. + * Use {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()} + * and {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to access the replacement + * class {@link org.apache.kafka.clients.admin.LogDirDescription}. */ + @Deprecated static public class LogDirInfo { public final Errors error; public final Map replicaInfos; @@ -117,6 +104,14 @@ public class DescribeLogDirsResponse extends AbstractResponse { } // Note this class is part of the public API, reachable from Admin.describeLogDirs() + + /** + * @deprecated Deprecated Since Kafka 2.7. + * Use {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()} + * and {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to access the replacement + * class {@link org.apache.kafka.clients.admin.ReplicaInfo}. + */ + @Deprecated static public class ReplicaInfo { public final long size; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 52a6efad63c..305e6bfda5c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -89,6 +89,7 @@ import org.apache.kafka.common.message.DescribeConfigsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.DescribeLogDirsResponseData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; @@ -182,6 +183,7 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; +import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse; import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse; @@ -1350,6 +1352,263 @@ public class KafkaAdminClientTest { } } + private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { + return prepareDescribeLogDirsResponse(error, logDir, + prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); + } + + private static List prepareDescribeLogDirsTopics( + long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) { + return singletonList(new DescribeLogDirsTopic() + .setName(topic) + .setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() + .setPartitionIndex(partition) + .setPartitionSize(partitionSize) + .setIsFutureKey(isFuture) + .setOffsetLag(offsetLag)))); + } + + private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, + List topics) { + return new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(error.code()) + .setLogDir(logDir) + .setTopics(topics) + ))); + } + + @Test + public void testDescribeLogDirs() throws ExecutionException, InterruptedException { + Set brokers = Collections.singleton(0); + String logDir = "/var/data/kafka"; + TopicPartition tp = new TopicPartition("topic", 12); + long partitionSize = 1234567890; + long offsetLag = 24; + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, partitionSize, offsetLag), + env.cluster().nodeById(0)); + + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map>> descriptions = result.descriptions(); + assertEquals(brokers, descriptions.keySet()); + assertNotNull(descriptions.get(0)); + assertDescriptionContains(descriptions.get(0).get(), logDir, tp, partitionSize, offsetLag); + + Map> allDescriptions = result.allDescriptions().get(); + assertEquals(brokers, allDescriptions.keySet()); + assertDescriptionContains(allDescriptions.get(0), logDir, tp, partitionSize, offsetLag); + } + } + + private static void assertDescriptionContains(Map descriptionsMap, String logDir, + TopicPartition tp, long partitionSize, long offsetLag) { + assertNotNull(descriptionsMap); + assertEquals(Collections.singleton(logDir), descriptionsMap.keySet()); + assertNull(descriptionsMap.get(logDir).error()); + Map descriptionsReplicaInfos = descriptionsMap.get(logDir).replicaInfos(); + assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet()); + assertEquals(partitionSize, descriptionsReplicaInfos.get(tp).size()); + assertEquals(offsetLag, descriptionsReplicaInfos.get(tp).offsetLag()); + assertFalse(descriptionsReplicaInfos.get(tp).isFuture()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException { + Set brokers = Collections.singleton(0); + TopicPartition tp = new TopicPartition("topic", 12); + String logDir = "/var/data/kafka"; + Errors error = Errors.NONE; + int offsetLag = 24; + long partitionSize = 1234567890; + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(error, logDir, tp, partitionSize, offsetLag), + env.cluster().nodeById(0)); + + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map>> deprecatedValues = result.values(); + assertEquals(brokers, deprecatedValues.keySet()); + assertNotNull(deprecatedValues.get(0)); + assertDescriptionContains(deprecatedValues.get(0).get(), logDir, tp, error, offsetLag, partitionSize); + + Map> deprecatedAll = result.all().get(); + assertEquals(brokers, deprecatedAll.keySet()); + assertDescriptionContains(deprecatedAll.get(0), logDir, tp, error, offsetLag, partitionSize); + } + } + + @SuppressWarnings("deprecation") + private static void assertDescriptionContains(Map descriptionsMap, + String logDir, TopicPartition tp, Errors error, + int offsetLag, long partitionSize) { + assertNotNull(descriptionsMap); + assertEquals(Collections.singleton(logDir), descriptionsMap.keySet()); + assertEquals(error, descriptionsMap.get(logDir).error); + Map allReplicaInfos = + descriptionsMap.get(logDir).replicaInfos; + assertEquals(Collections.singleton(tp), allReplicaInfos.keySet()); + assertEquals(partitionSize, allReplicaInfos.get(tp).size); + assertEquals(offsetLag, allReplicaInfos.get(tp).offsetLag); + assertFalse(allReplicaInfos.get(tp).isFuture); + } + + @Test + public void testDescribeLogDirsOfflineDir() throws ExecutionException, InterruptedException { + Set brokers = Collections.singleton(0); + String logDir = "/var/data/kafka"; + Errors error = Errors.KAFKA_STORAGE_ERROR; + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(error, logDir, emptyList()), + env.cluster().nodeById(0)); + + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map>> descriptions = result.descriptions(); + assertEquals(brokers, descriptions.keySet()); + assertNotNull(descriptions.get(0)); + Map descriptionsMap = descriptions.get(0).get(); + assertEquals(Collections.singleton(logDir), descriptionsMap.keySet()); + assertEquals(error.exception().getClass(), descriptionsMap.get(logDir).error().getClass()); + assertEquals(emptySet(), descriptionsMap.get(logDir).replicaInfos().keySet()); + + Map> allDescriptions = result.allDescriptions().get(); + assertEquals(brokers, allDescriptions.keySet()); + Map allMap = allDescriptions.get(0); + assertNotNull(allMap); + assertEquals(Collections.singleton(logDir), allMap.keySet()); + assertEquals(error.exception().getClass(), allMap.get(logDir).error().getClass()); + assertEquals(emptySet(), allMap.get(logDir).replicaInfos().keySet()); + } + } + + @SuppressWarnings("deprecation") + @Test + public void testDescribeLogDirsOfflineDirDeprecated() throws ExecutionException, InterruptedException { + Set brokers = Collections.singleton(0); + String logDir = "/var/data/kafka"; + Errors error = Errors.KAFKA_STORAGE_ERROR; + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(error, logDir, emptyList()), + env.cluster().nodeById(0)); + + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map>> deprecatedValues = result.values(); + assertEquals(brokers, deprecatedValues.keySet()); + assertNotNull(deprecatedValues.get(0)); + Map valuesMap = deprecatedValues.get(0).get(); + assertEquals(Collections.singleton(logDir), valuesMap.keySet()); + assertEquals(error, valuesMap.get(logDir).error); + assertEquals(emptySet(), valuesMap.get(logDir).replicaInfos.keySet()); + + Map> deprecatedAll = result.all().get(); + assertEquals(brokers, deprecatedAll.keySet()); + Map allMap = deprecatedAll.get(0); + assertNotNull(allMap); + assertEquals(Collections.singleton(logDir), allMap.keySet()); + assertEquals(error, allMap.get(logDir).error); + assertEquals(emptySet(), allMap.get(logDir).replicaInfos.keySet()); + } + } + + @Test + public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException { + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 12, 1); + TopicPartitionReplica tpr2 = new TopicPartitionReplica("topic", 12, 2); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + String broker1log1 = "/var/data/kafka1"; + String broker2log0 = "/var/data/kafka2"; + int broker1Log0OffsetLag = 24; + int broker1Log0PartitionSize = 987654321; + int broker1Log1PartitionSize = 123456789; + int broker1Log1OffsetLag = 4321; + env.kafkaClient().prepareResponseFrom( + new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(asList( + prepareDescribeLogDirsResult(tpr1, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false), + prepareDescribeLogDirsResult(tpr1, broker1log1, broker1Log1PartitionSize, broker1Log1OffsetLag, true)))), + env.cluster().nodeById(tpr1.brokerId())); + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.KAFKA_STORAGE_ERROR, broker2log0), + env.cluster().nodeById(tpr2.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(tpr1, tpr2)); + + Map> values = result.values(); + assertEquals(TestUtils.toSet(asList(tpr1, tpr2)), values.keySet()); + + assertNotNull(values.get(tpr1)); + assertEquals(broker1log0, values.get(tpr1).get().getCurrentReplicaLogDir()); + assertEquals(broker1Log0OffsetLag, values.get(tpr1).get().getCurrentReplicaOffsetLag()); + assertEquals(broker1log1, values.get(tpr1).get().getFutureReplicaLogDir()); + assertEquals(broker1Log1OffsetLag, values.get(tpr1).get().getFutureReplicaOffsetLag()); + + assertNotNull(values.get(tpr2)); + assertNull(values.get(tpr2).get().getCurrentReplicaLogDir()); + assertEquals(-1, values.get(tpr2).get().getCurrentReplicaOffsetLag()); + assertNull(values.get(tpr2).get().getFutureReplicaLogDir()); + assertEquals(-1, values.get(tpr2).get().getFutureReplicaOffsetLag()); + } + } + + private static DescribeLogDirsResponseData.DescribeLogDirsResult prepareDescribeLogDirsResult(TopicPartitionReplica tpr, String logDir, int partitionSize, int offsetLag, boolean isFuture) { + return new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.NONE.code()) + .setLogDir(logDir) + .setTopics(prepareDescribeLogDirsTopics(partitionSize, offsetLag, tpr.topic(), tpr.partition(), isFuture)); + } + + @Test + public void testDescribeReplicaLogDirsUnexpected() throws ExecutionException, InterruptedException { + TopicPartitionReplica expected = new TopicPartitionReplica("topic", 12, 1); + TopicPartitionReplica unexpected = new TopicPartitionReplica("topic", 12, 2); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + String broker1log1 = "/var/data/kafka1"; + int broker1Log0PartitionSize = 987654321; + int broker1Log0OffsetLag = 24; + int broker1Log1PartitionSize = 123456789; + int broker1Log1OffsetLag = 4321; + env.kafkaClient().prepareResponseFrom( + new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(asList( + prepareDescribeLogDirsResult(expected, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false), + prepareDescribeLogDirsResult(unexpected, broker1log1, broker1Log1PartitionSize, broker1Log1OffsetLag, true)))), + env.cluster().nodeById(expected.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(expected)); + + Map> values = result.values(); + assertEquals(TestUtils.toSet(asList(expected)), values.keySet()); + + assertNotNull(values.get(expected)); + assertEquals(broker1log0, values.get(expected).get().getCurrentReplicaLogDir()); + assertEquals(broker1Log0OffsetLag, values.get(expected).get().getCurrentReplicaOffsetLag()); + assertEquals(broker1log1, values.get(expected).get().getFutureReplicaLogDir()); + assertEquals(broker1Log1OffsetLag, values.get(expected).get().getFutureReplicaOffsetLag()); + } + } + @Test public void testCreatePartitions() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { @@ -4187,8 +4446,8 @@ public class KafkaAdminClientTest { DescribeLogDirsResult result = env.adminClient().describeLogDirs(Arrays.asList(0, 1)); - TestUtils.assertFutureThrows(result.values().get(0), ApiException.class); - assertNotNull(result.values().get(1).get()); + TestUtils.assertFutureThrows(result.descriptions().get(0), ApiException.class); + assertNotNull(result.descriptions().get(1).get()); } } diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala index 6e4576274ed..ad0307f31b9 100644 --- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala +++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala @@ -21,8 +21,7 @@ import java.io.PrintStream import java.util.Properties import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json} -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult} -import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult, LogDirDescription} import org.apache.kafka.common.utils.Utils import scala.jdk.CollectionConverters._ @@ -48,14 +47,14 @@ object LogDirsCommand { out.println("Querying brokers for log directories information") val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) - val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.map { case (k, v) => k -> v.asScala } + val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}") out.println(formatAsJson(logDirInfosByBroker, topicList.toSet)) adminClient.close() } - private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]], topicSet: Set[String]): String = { + private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirDescription]], topicSet: Set[String]): String = { Json.encodeAsString(Map( "version" -> 1, "brokers" -> logDirInfosByBroker.map { case (broker, logDirInfos) => @@ -64,7 +63,7 @@ object LogDirsCommand { "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) => Map( "logDir" -> logDir, - "error" -> logDirInfo.error.exceptionName(), + "error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull, "partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) => topicSet.isEmpty || topicSet.contains(topicPartition.topic) }.map { case (topicPartition, replicaInfo) => diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala index 974877b7afb..e83aec44af2 100644 --- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala @@ -490,7 +490,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { val logDirs = new mutable.HashSet[String] val curLogDirs = new mutable.HashMap[TopicPartition, String] val futureLogDirs = new mutable.HashMap[TopicPartition, String] - result.values.get(brokerId).get().forEach { + result.descriptions.get(brokerId).get().forEach { case (logDirName, logDirInfo) => { logDirs.add(logDirName) logDirInfo.replicaInfos.forEach { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 6b98dc6e3b4..863fb43b530 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -197,7 +197,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .find(x => x.topicName == tp.topic).get.partitions.asScala .find(p => p.partitionIndex == tp.partition).get.errorCode)), ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) => - if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED), + if (resp.data.results.size() > 0) Errors.forCode(resp.data.results.get(0).errorCode) else Errors.CLUSTER_AUTHORIZATION_FAILED), ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => Errors.forCode(resp.data.results.asScala.head.errorCode())), ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())), ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 136e2a03830..31c0426330a 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -176,7 +176,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { k -> v.keys.toSeq } val brokers = (0 until brokerCount).map(Integer.valueOf) - val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get + val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).allDescriptions.get (0 until brokerCount).foreach { brokerId => val server = servers.find(_.config.brokerId == brokerId).get diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala index e8bad37a9c0..13de19f375d 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala @@ -27,6 +27,8 @@ import org.apache.kafka.common.requests._ import org.junit.Assert._ import org.junit.Test +import scala.jdk.CollectionConverters._ + class DescribeLogDirsRequestTest extends BaseRequestTest { override val logDirCount = 2 override val brokerCount: Int = 1 @@ -46,19 +48,25 @@ class DescribeLogDirsRequestTest extends BaseRequestTest { val request = new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)).build() val response = connectAndReceive[DescribeLogDirsResponse](request, destination = controllerSocketServer) - val logDirInfos = response.logDirInfos() - assertEquals(logDirCount, logDirInfos.size()) - assertEquals(Errors.KAFKA_STORAGE_ERROR, logDirInfos.get(offlineDir).error) - assertEquals(0, logDirInfos.get(offlineDir).replicaInfos.size()) + assertEquals(logDirCount, response.data.results.size) + val offlineResult = response.data.results.asScala.find(logDirResult => logDirResult.logDir == offlineDir).get + assertEquals(Errors.KAFKA_STORAGE_ERROR.code, offlineResult.errorCode) + assertEquals(0, offlineResult.topics.asScala.map(t => t.partitions().size()).sum) - assertEquals(Errors.NONE, logDirInfos.get(onlineDir).error) - val replicaInfo0 = logDirInfos.get(onlineDir).replicaInfos.get(tp0) - val replicaInfo1 = logDirInfos.get(onlineDir).replicaInfos.get(tp1) + val onlineResult = response.data.results.asScala.find(logDirResult => logDirResult.logDir == onlineDir).get + assertEquals(Errors.NONE.code, onlineResult.errorCode) + val onlinePartitionsMap = onlineResult.topics.asScala.flatMap { topic => + topic.partitions().asScala.map { partitionResult => + new TopicPartition(topic.name, partitionResult.partitionIndex) -> partitionResult + } + }.toMap + val replicaInfo0 = onlinePartitionsMap(tp0) + val replicaInfo1 = onlinePartitionsMap(tp1) val log0 = servers.head.logManager.getLog(tp0).get val log1 = servers.head.logManager.getLog(tp1).get - assertEquals(log0.size, replicaInfo0.size) - assertEquals(log1.size, replicaInfo1.size) + assertEquals(log0.size, replicaInfo0.partitionSize) + assertEquals(log1.size, replicaInfo1.partitionSize) val logEndOffset = servers.head.logManager.getLog(tp0).get.logEndOffset assertTrue(s"LogEndOffset '$logEndOffset' should be > 0", logEndOffset > 0) assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp0, log0.logEndOffset, false), replicaInfo0.offsetLag) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java index efae747952d..440dfd463ac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java @@ -18,11 +18,12 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.clients.admin.ReplicaInfo; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Time; @@ -120,13 +121,13 @@ public class PurgeRepartitionTopicIntegrationTest { time.sleep(PURGE_INTERVAL_MS); try { - final Collection logDirInfo = - adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values(); + final Collection logDirInfo = + adminClient.describeLogDirs(Collections.singleton(0)).descriptions().get(0).get().values(); - for (final DescribeLogDirsResponse.LogDirInfo partitionInfo : logDirInfo) { - final DescribeLogDirsResponse.ReplicaInfo replicaInfo = - partitionInfo.replicaInfos.get(new TopicPartition(REPARTITION_TOPIC, 0)); - if (replicaInfo != null && verifier.verify(replicaInfo.size)) { + for (final LogDirDescription partitionInfo : logDirInfo) { + final ReplicaInfo replicaInfo = + partitionInfo.replicaInfos().get(new TopicPartition(REPARTITION_TOPIC, 0)); + if (replicaInfo != null && verifier.verify(replicaInfo.size())) { return true; } }