From 690f72dd69d31589655c84d3cc1a6eec006bcab5 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Tue, 9 Feb 2021 14:11:35 -0800 Subject: [PATCH] KAFKA-12334: Add the KIP-500 metadata shell The Kafka Metadata shell is a new command which allows users to interactively examine the metadata stored in a KIP-500 cluster. It can examine snapshot files that are specified via --snapshot. The metadata tool works by replaying the log and storing the state into in-memory nodes. These nodes are presented in a fashion similar to filesystem directories. Reviewers: Jason Gustafson , David Arthur , Igor Soarez --- bin/kafka-metadata-shell.sh | 17 + build.gradle | 43 ++ checkstyle/import-control.xml | 17 + checkstyle/suppressions.xml | 4 + gradle/dependencies.gradle | 2 + .../apache/kafka/metalog/LocalLogManager.java | 378 ++++++++++++++++++ .../common/metadata/IsrChangeRecord.json | 4 +- .../common/metadata/PartitionRecord.json | 4 +- .../kafka/metadata/MetadataParserTest.java | 2 +- .../kafka/metalog/LocalLogManagerTest.java | 2 +- settings.gradle | 1 + .../apache/kafka/shell/CatCommandHandler.java | 120 ++++++ .../apache/kafka/shell/CdCommandHandler.java | 117 ++++++ .../org/apache/kafka/shell/CommandUtils.java | 148 +++++++ .../java/org/apache/kafka/shell/Commands.java | 154 +++++++ .../kafka/shell/ErroneousCommandHandler.java | 58 +++ .../kafka/shell/ExitCommandHandler.java | 88 ++++ .../kafka/shell/FindCommandHandler.java | 121 ++++++ .../org/apache/kafka/shell/GlobComponent.java | 179 +++++++++ .../org/apache/kafka/shell/GlobVisitor.java | 148 +++++++ .../kafka/shell/HelpCommandHandler.java | 88 ++++ .../kafka/shell/HistoryCommandHandler.java | 108 +++++ .../apache/kafka/shell/InteractiveShell.java | 172 ++++++++ .../apache/kafka/shell/LsCommandHandler.java | 299 ++++++++++++++ .../apache/kafka/shell/ManCommandHandler.java | 109 +++++ .../org/apache/kafka/shell/MetadataNode.java | 140 +++++++ .../kafka/shell/MetadataNodeManager.java | 302 ++++++++++++++ .../org/apache/kafka/shell/MetadataShell.java | 174 ++++++++ .../kafka/shell/NoOpCommandHandler.java | 43 ++ .../kafka/shell/NotDirectoryException.java | 30 ++ .../apache/kafka/shell/NotFileException.java | 30 ++ .../apache/kafka/shell/PwdCommandHandler.java | 89 +++++ .../kafka/shell/SnapshotFileReader.java | 194 +++++++++ .../org/apache/kafka/shell/CommandTest.java | 70 ++++ .../apache/kafka/shell/CommandUtilsTest.java | 37 ++ .../apache/kafka/shell/GlobComponentTest.java | 75 ++++ .../apache/kafka/shell/GlobVisitorTest.java | 144 +++++++ .../kafka/shell/LsCommandHandlerTest.java | 99 +++++ .../apache/kafka/shell/MetadataNodeTest.java | 73 ++++ 39 files changed, 3879 insertions(+), 4 deletions(-) create mode 100755 bin/kafka-metadata-shell.sh create mode 100644 metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/CommandUtils.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/Commands.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/GlobComponent.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/MetadataNode.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/MetadataShell.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/NotFileException.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java create mode 100644 shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java create mode 100644 shell/src/test/java/org/apache/kafka/shell/CommandTest.java create mode 100644 shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java create mode 100644 shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java create mode 100644 shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java create mode 100644 shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java create mode 100644 shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java diff --git a/bin/kafka-metadata-shell.sh b/bin/kafka-metadata-shell.sh new file mode 100755 index 00000000000..289f0c1b51f --- /dev/null +++ b/bin/kafka-metadata-shell.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@" diff --git a/build.gradle b/build.gradle index 790cd116139..d03748848ce 100644 --- a/build.gradle +++ b/build.gradle @@ -1351,6 +1351,49 @@ project(':tools') { } } +project(':shell') { + archivesBaseName = "kafka-shell" + + dependencies { + compile libs.argparse4j + compile libs.jacksonDatabind + compile libs.jacksonJDK8Datatypes + compile libs.jline + compile libs.slf4jApi + compile project(':clients') + compile project(':core') + compile project(':log4j-appender') + compile project(':metadata') + compile project(':raft') + + compile libs.jacksonJaxrsJsonProvider + + testCompile project(':clients') + testCompile libs.junitJupiter + + testRuntime libs.slf4jlog4j + } + + javadoc { + enabled = false + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('jline-*jar') + } + from (configurations.runtime) { + include('jline-*jar') + } + into "$buildDir/dependant-libs-${versions.scala}" + duplicatesStrategy 'exclude' + } + + jar { + dependsOn 'copyDependantLibs' + } +} + project(':streams') { archivesBaseName = "kafka-streams" ext.buildStreamsVersionFileName = "kafka-streams-version.properties" diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index aad58b0ddd8..63ed7ab238b 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -269,6 +269,23 @@ + + + + + + + + + + + + + + + + + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 76690bbc829..1cfc630c0ea 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -253,6 +253,10 @@ + + + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 9340e3c3103..30193bc19f6 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -71,6 +71,7 @@ versions += [ jacoco: "0.8.5", jetty: "9.4.33.v20201020", jersey: "2.31", + jline: "3.12.1", jmh: "1.27", hamcrest: "2.2", log4j: "1.2.17", @@ -149,6 +150,7 @@ libs += [ jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty", jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey", jerseyHk2: "org.glassfish.jersey.inject:jersey-hk2:$versions.jersey", + jline: "org.jline:jline:$versions.jline", jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh", jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh", jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh", diff --git a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java new file mode 100644 index 00000000000..ef85314e0ef --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metalog; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +/** + * The LocalLogManager is a test implementation that relies on the contents of memory. + */ +public final class LocalLogManager implements MetaLogManager, AutoCloseable { + interface LocalBatch { + int size(); + } + + static class LeaderChangeBatch implements LocalBatch { + private final MetaLogLeader newLeader; + + LeaderChangeBatch(MetaLogLeader newLeader) { + this.newLeader = newLeader; + } + + @Override + public int size() { + return 1; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof LeaderChangeBatch)) return false; + LeaderChangeBatch other = (LeaderChangeBatch) o; + if (!other.newLeader.equals(newLeader)) return false; + return true; + } + + @Override + public int hashCode() { + return Objects.hash(newLeader); + } + + @Override + public String toString() { + return "LeaderChangeBatch(newLeader=" + newLeader + ")"; + } + } + + static class LocalRecordBatch implements LocalBatch { + private final List records; + + LocalRecordBatch(List records) { + this.records = records; + } + + @Override + public int size() { + return records.size(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof LocalRecordBatch)) return false; + LocalRecordBatch other = (LocalRecordBatch) o; + if (!other.records.equals(records)) return false; + return true; + } + + @Override + public int hashCode() { + return Objects.hash(records); + } + + @Override + public String toString() { + return "LocalRecordBatch(records=" + records + ")"; + } + } + + public static class SharedLogData { + private final Logger log = LoggerFactory.getLogger(SharedLogData.class); + private final HashMap logManagers = new HashMap<>(); + private final TreeMap batches = new TreeMap<>(); + private MetaLogLeader leader = new MetaLogLeader(-1, -1); + private long prevOffset = -1; + + synchronized void registerLogManager(LocalLogManager logManager) { + if (logManagers.put(logManager.nodeId(), logManager) != null) { + throw new RuntimeException("Can't have multiple LocalLogManagers " + + "with id " + logManager.nodeId()); + } + electLeaderIfNeeded(); + } + + synchronized void unregisterLogManager(LocalLogManager logManager) { + if (!logManagers.remove(logManager.nodeId(), logManager)) { + throw new RuntimeException("Log manager " + logManager.nodeId() + + " was not found."); + } + } + + synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) { + if (epoch != leader.epoch()) { + log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " + + "match the current leader epoch of {}.", nodeId, epoch, leader.epoch()); + return Long.MAX_VALUE; + } + if (nodeId != leader.nodeId()) { + log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " + + "match the current leader id of {}.", nodeId, leader.nodeId()); + return Long.MAX_VALUE; + } + log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch); + long offset = append(batch); + electLeaderIfNeeded(); + return offset; + } + + synchronized long append(LocalBatch batch) { + prevOffset += batch.size(); + log.debug("append(batch={}, prevOffset={})", batch, prevOffset); + batches.put(prevOffset, batch); + if (batch instanceof LeaderChangeBatch) { + LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch; + leader = leaderChangeBatch.newLeader; + } + for (LocalLogManager logManager : logManagers.values()) { + logManager.scheduleLogCheck(); + } + return prevOffset; + } + + synchronized void electLeaderIfNeeded() { + if (leader.nodeId() != -1 || logManagers.isEmpty()) { + return; + } + int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size()); + Iterator iter = logManagers.keySet().iterator(); + Integer nextLeaderNode = null; + for (int i = 0; i <= nextLeaderIndex; i++) { + nextLeaderNode = iter.next(); + } + MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1); + log.info("Elected new leader: {}.", newLeader); + append(new LeaderChangeBatch(newLeader)); + } + + synchronized Entry nextBatch(long offset) { + Entry entry = batches.higherEntry(offset); + if (entry == null) { + return null; + } + return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue()); + } + } + + private static class MetaLogListenerData { + private long offset = -1; + private final MetaLogListener listener; + + MetaLogListenerData(MetaLogListener listener) { + this.listener = listener; + } + } + + private final Logger log; + + private final int nodeId; + + private final SharedLogData shared; + + private final EventQueue eventQueue; + + private boolean initialized = false; + + private boolean shutdown = false; + + private long maxReadOffset = Long.MAX_VALUE; + + private final List listeners = new ArrayList<>(); + + private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1); + + public LocalLogManager(LogContext logContext, + int nodeId, + SharedLogData shared, + String threadNamePrefix) { + this.log = logContext.logger(LocalLogManager.class); + this.nodeId = nodeId; + this.shared = shared; + this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix); + shared.registerLogManager(this); + } + + private void scheduleLogCheck() { + eventQueue.append(() -> { + try { + log.debug("Node {}: running log check.", nodeId); + int numEntriesFound = 0; + for (MetaLogListenerData listenerData : listeners) { + while (true) { + Entry entry = shared.nextBatch(listenerData.offset); + if (entry == null) { + log.trace("Node {}: reached the end of the log after finding " + + "{} entries.", nodeId, numEntriesFound); + break; + } + long entryOffset = entry.getKey(); + if (entryOffset > maxReadOffset) { + log.trace("Node {}: after {} entries, not reading the next " + + "entry because its offset is {}, and maxReadOffset is {}.", + nodeId, numEntriesFound, entryOffset, maxReadOffset); + break; + } + if (entry.getValue() instanceof LeaderChangeBatch) { + LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue(); + log.trace("Node {}: handling LeaderChange to {}.", + nodeId, batch.newLeader); + listenerData.listener.handleNewLeader(batch.newLeader); + if (batch.newLeader.epoch() > leader.epoch()) { + leader = batch.newLeader; + } + } else if (entry.getValue() instanceof LocalRecordBatch) { + LocalRecordBatch batch = (LocalRecordBatch) entry.getValue(); + log.trace("Node {}: handling LocalRecordBatch with offset {}.", + nodeId, entryOffset); + listenerData.listener.handleCommits(entryOffset, batch.records); + } + numEntriesFound++; + listenerData.offset = entryOffset; + } + } + log.trace("Completed log check for node " + nodeId); + } catch (Exception e) { + log.error("Exception while handling log check", e); + } + }); + } + + public void beginShutdown() { + eventQueue.beginShutdown("beginShutdown", () -> { + try { + if (initialized && !shutdown) { + log.debug("Node {}: beginning shutdown.", nodeId); + renounce(leader.epoch()); + for (MetaLogListenerData listenerData : listeners) { + listenerData.listener.beginShutdown(); + } + shared.unregisterLogManager(this); + } + } catch (Exception e) { + log.error("Unexpected exception while sending beginShutdown callbacks", e); + } + shutdown = true; + }); + } + + @Override + public void close() throws InterruptedException { + log.debug("Node {}: closing.", nodeId); + beginShutdown(); + eventQueue.close(); + } + + @Override + public void initialize() throws Exception { + eventQueue.append(() -> { + log.debug("initialized local log manager for node " + nodeId); + initialized = true; + }); + } + + @Override + public void register(MetaLogListener listener) throws Exception { + CompletableFuture future = new CompletableFuture<>(); + eventQueue.append(() -> { + if (shutdown) { + log.info("Node {}: can't register because local log manager has " + + "already been shut down.", nodeId); + future.complete(null); + } else if (initialized) { + log.info("Node {}: registered MetaLogListener.", nodeId); + listeners.add(new MetaLogListenerData(listener)); + shared.electLeaderIfNeeded(); + scheduleLogCheck(); + future.complete(null); + } else { + log.info("Node {}: can't register because local log manager has not " + + "been initialized.", nodeId); + future.completeExceptionally(new RuntimeException( + "LocalLogManager was not initialized.")); + } + }); + future.get(); + } + + @Override + public long scheduleWrite(long epoch, List batch) { + return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch( + batch.stream().map(r -> r.message()).collect(Collectors.toList()))); + } + + @Override + public void renounce(long epoch) { + MetaLogLeader curLeader = leader; + MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1); + shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader)); + } + + @Override + public MetaLogLeader leader() { + return leader; + } + + @Override + public int nodeId() { + return nodeId; + } + + public List listeners() { + final CompletableFuture> future = new CompletableFuture<>(); + eventQueue.append(() -> { + future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList())); + }); + try { + return future.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + public void setMaxReadOffset(long maxReadOffset) { + CompletableFuture future = new CompletableFuture<>(); + eventQueue.append(() -> { + log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset); + this.maxReadOffset = maxReadOffset; + scheduleLogCheck(); + future.complete(null); + }); + try { + future.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/metadata/src/main/resources/common/metadata/IsrChangeRecord.json b/metadata/src/main/resources/common/metadata/IsrChangeRecord.json index e4583909095..fd8d8341787 100644 --- a/metadata/src/main/resources/common/metadata/IsrChangeRecord.json +++ b/metadata/src/main/resources/common/metadata/IsrChangeRecord.json @@ -28,6 +28,8 @@ { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "about": "The lead replica, or -1 if there is no leader." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", - "about": "An epoch that gets incremented each time we change the ISR." } + "about": "An epoch that gets incremented each time we change the partition leader." }, + { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", + "about": "An epoch that gets incremented each time we change anything in the partition." } ] } diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json b/metadata/src/main/resources/common/metadata/PartitionRecord.json index 79c24c2f1ac..5cc7d1328c9 100644 --- a/metadata/src/main/resources/common/metadata/PartitionRecord.json +++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json @@ -34,6 +34,8 @@ { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "about": "The lead replica, or -1 if there is no leader." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", - "about": "An epoch that gets incremented each time we change the ISR." } + "about": "An epoch that gets incremented each time we change the partition leader." }, + { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1", + "about": "An epoch that gets incremented each time we change anything in the partition." } ] } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java index 6d673b3afdd..41e968c4d5c 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java @@ -82,7 +82,7 @@ public class MetadataParserTest { PartitionRecord partitionRecord = new PartitionRecord(). setReplicas(longReplicaList); ObjectSerializationCache cache = new ObjectSerializationCache(); - assertEquals("Event size would be 33554478, but the maximum serialized event " + + assertEquals("Event size would be 33554482, but the maximum serialized event " + "size is 33554432", assertThrows(RuntimeException.class, () -> { MetadataParser.size(partitionRecord, (short) 0, cache); }).getMessage()); diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java index 9d4eb8b594e..9dd6262ff66 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java @@ -51,7 +51,7 @@ public class LocalLogManagerTest { } /** - * Test that the local log maanger will claim leadership. + * Test that the local log manager will claim leadership. */ @Test public void testClaimsLeadership() throws Exception { diff --git a/settings.gradle b/settings.gradle index 55f77f3bed3..fedfa9a650c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,6 +29,7 @@ include 'clients', 'log4j-appender', 'metadata', 'raft', + 'shell', 'streams', 'streams:examples', 'streams:streams-scala', diff --git a/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java new file mode 100644 index 00000000000..3fc94279565 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.shell.MetadataNode.DirectoryNode; +import org.apache.kafka.shell.MetadataNode.FileNode; +import org.jline.reader.Candidate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Implements the cat command. + */ +public final class CatCommandHandler implements Commands.Handler { + private static final Logger log = LoggerFactory.getLogger(CatCommandHandler.class); + + public final static Commands.Type TYPE = new CatCommandType(); + + public static class CatCommandType implements Commands.Type { + private CatCommandType() { + } + + @Override + public String name() { + return "cat"; + } + + @Override + public String description() { + return "Show the contents of metadata nodes."; + } + + @Override + public boolean shellOnly() { + return false; + } + + @Override + public void addArguments(ArgumentParser parser) { + parser.addArgument("targets"). + nargs("+"). + help("The metadata nodes to display."); + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + return new CatCommandHandler(namespace.getList("targets")); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1), + candidates); + } + } + + private final List targets; + + public CatCommandHandler(List targets) { + this.targets = targets; + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) throws Exception { + log.trace("cat " + targets); + for (String target : targets) { + manager.visit(new GlobVisitor(target, entryOption -> { + if (entryOption.isPresent()) { + MetadataNode node = entryOption.get().node(); + if (node instanceof DirectoryNode) { + writer.println("cat: " + target + ": Is a directory"); + } else if (node instanceof FileNode) { + FileNode fileNode = (FileNode) node; + writer.println(fileNode.contents()); + } + } else { + writer.println("cat: " + target + ": No such file or directory."); + } + })); + } + } + + @Override + public int hashCode() { + return targets.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof CatCommandHandler)) return false; + CatCommandHandler o = (CatCommandHandler) other; + if (!Objects.equals(o.targets, targets)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java new file mode 100644 index 00000000000..8d270e54328 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.shell.MetadataNode.DirectoryNode; +import org.jline.reader.Candidate; + +import java.io.PrintWriter; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +/** + * Implements the cd command. + */ +public final class CdCommandHandler implements Commands.Handler { + public final static Commands.Type TYPE = new CdCommandType(); + + public static class CdCommandType implements Commands.Type { + private CdCommandType() { + } + + @Override + public String name() { + return "cd"; + } + + @Override + public String description() { + return "Set the current working directory."; + } + + @Override + public boolean shellOnly() { + return true; + } + + @Override + public void addArguments(ArgumentParser parser) { + parser.addArgument("target"). + nargs("?"). + help("The directory to change to."); + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + return new CdCommandHandler(Optional.ofNullable(namespace.getString("target"))); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + if (nextWords.size() == 1) { + CommandUtils.completePath(nodeManager, nextWords.get(0), candidates); + } + } + } + + private final Optional target; + + public CdCommandHandler(Optional target) { + this.target = target; + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) throws Exception { + String effectiveTarget = target.orElse("/"); + manager.visit(new Consumer() { + @Override + public void accept(MetadataNodeManager.Data data) { + new GlobVisitor(effectiveTarget, entryOption -> { + if (entryOption.isPresent()) { + if (!(entryOption.get().node() instanceof DirectoryNode)) { + writer.println("cd: " + effectiveTarget + ": not a directory."); + } else { + data.setWorkingDirectory(entryOption.get().absolutePath()); + } + } else { + writer.println("cd: " + effectiveTarget + ": no such directory."); + } + }).accept(data); + } + }); + } + + @Override + public int hashCode() { + return target.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof CdCommandHandler)) return false; + CdCommandHandler o = (CdCommandHandler) other; + if (!o.target.equals(target)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java b/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java new file mode 100644 index 00000000000..0639172e95d --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import org.apache.kafka.shell.MetadataNode.DirectoryNode; +import org.jline.reader.Candidate; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +/** + * Utility functions for command handlers. + */ +public final class CommandUtils { + /** + * Convert a list of paths into the effective list of paths which should be used. + * Empty strings will be removed. If no paths are given, the current working + * directory will be used. + * + * @param paths The input paths. Non-null. + * + * @return The output paths. + */ + public static List getEffectivePaths(List paths) { + List effectivePaths = new ArrayList<>(); + for (String path : paths) { + if (!path.isEmpty()) { + effectivePaths.add(path); + } + } + if (effectivePaths.isEmpty()) { + effectivePaths.add("."); + } + return effectivePaths; + } + + /** + * Generate a list of potential completions for a prefix of a command name. + * + * @param commandPrefix The command prefix. Non-null. + * @param candidates The list to add the output completions to. + */ + public static void completeCommand(String commandPrefix, List candidates) { + String command = Commands.TYPES.ceilingKey(commandPrefix); + while (command != null && command.startsWith(commandPrefix)) { + candidates.add(new Candidate(command)); + command = Commands.TYPES.higherKey(command); + } + } + + /** + * Convert a path to a list of path components. + * Multiple slashes in a row are treated the same as a single slash. + * Trailing slashes are ignored. + */ + public static List splitPath(String path) { + List results = new ArrayList<>(); + String[] components = path.split("/"); + for (int i = 0; i < components.length; i++) { + if (!components[i].isEmpty()) { + results.add(components[i]); + } + } + return results; + } + + public static List stripDotPathComponents(List input) { + List output = new ArrayList<>(); + for (String string : input) { + if (string.equals("..")) { + if (output.size() > 0) { + output.remove(output.size() - 1); + } + } else if (!string.equals(".")) { + output.add(string); + } + } + return output; + } + + /** + * Generate a list of potential completions for a path. + * + * @param nodeManager The NodeManager. + * @param pathPrefix The path prefix. Non-null. + * @param candidates The list to add the output completions to. + */ + public static void completePath(MetadataNodeManager nodeManager, + String pathPrefix, + List candidates) throws Exception { + nodeManager.visit(data -> { + String absolutePath = pathPrefix.startsWith("/") ? + pathPrefix : data.workingDirectory() + "/" + pathPrefix; + List pathComponents = stripDotPathComponents(splitPath(absolutePath)); + DirectoryNode directory = data.root(); + int numDirectories = pathPrefix.endsWith("/") ? + pathComponents.size() : pathComponents.size() - 1; + for (int i = 0; i < numDirectories; i++) { + MetadataNode node = directory.child(pathComponents.get(i)); + if (node == null || !(node instanceof DirectoryNode)) { + return; + } + directory = (DirectoryNode) node; + } + String lastComponent = ""; + if (numDirectories >= 0 && numDirectories < pathComponents.size()) { + lastComponent = pathComponents.get(numDirectories); + } + Entry candidate = + directory.children().ceilingEntry(lastComponent); + String effectivePrefix; + int lastSlash = pathPrefix.lastIndexOf('/'); + if (lastSlash < 0) { + effectivePrefix = ""; + } else { + effectivePrefix = pathPrefix.substring(0, lastSlash + 1); + } + while (candidate != null && candidate.getKey().startsWith(lastComponent)) { + StringBuilder candidateBuilder = new StringBuilder(); + candidateBuilder.append(effectivePrefix).append(candidate.getKey()); + boolean complete = true; + if (candidate.getValue() instanceof DirectoryNode) { + candidateBuilder.append("/"); + complete = false; + } + candidates.add(new Candidate(candidateBuilder.toString(), + candidateBuilder.toString(), null, null, null, null, complete)); + candidate = directory.children().higherEntry(candidate.getKey()); + } + }); + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/Commands.java b/shell/src/main/java/org/apache/kafka/shell/Commands.java new file mode 100644 index 00000000000..db16411ebae --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/Commands.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import net.sourceforge.argparse4j.internal.HelpScreenException; +import org.jline.reader.Candidate; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; + +/** + * The commands for the Kafka metadata tool. + */ +public final class Commands { + /** + * A map from command names to command types. + */ + static final NavigableMap TYPES; + + static { + TreeMap typesMap = new TreeMap<>(); + for (Type type : Arrays.asList( + CatCommandHandler.TYPE, + CdCommandHandler.TYPE, + ExitCommandHandler.TYPE, + FindCommandHandler.TYPE, + HelpCommandHandler.TYPE, + HistoryCommandHandler.TYPE, + LsCommandHandler.TYPE, + ManCommandHandler.TYPE, + PwdCommandHandler.TYPE)) { + typesMap.put(type.name(), type); + } + TYPES = Collections.unmodifiableNavigableMap(typesMap); + } + + /** + * Command handler objects are instantiated with specific arguments to + * execute commands. + */ + public interface Handler { + void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) throws Exception; + } + + /** + * An object which describes a type of command handler. This includes + * information like its name, help text, and whether it should be accessible + * from non-interactive mode. + */ + public interface Type { + String name(); + String description(); + boolean shellOnly(); + void addArguments(ArgumentParser parser); + Handler createHandler(Namespace namespace); + void completeNext(MetadataNodeManager nodeManager, + List nextWords, + List candidates) throws Exception; + } + + private final ArgumentParser parser; + + /** + * Create the commands instance. + * + * @param addShellCommands True if we should include the shell-only commands. + */ + public Commands(boolean addShellCommands) { + this.parser = ArgumentParsers.newArgumentParser("", false); + Subparsers subparsers = this.parser.addSubparsers().dest("command"); + for (Type type : TYPES.values()) { + if (addShellCommands || !type.shellOnly()) { + Subparser subParser = subparsers.addParser(type.name()); + subParser.help(type.description()); + type.addArguments(subParser); + } + } + } + + ArgumentParser parser() { + return parser; + } + + /** + * Handle the given command. + * + * In general this function should not throw exceptions. Instead, it should + * return ErroneousCommandHandler if the input was invalid. + * + * @param arguments The command line arguments. + * @return The command handler. + */ + public Handler parseCommand(List arguments) { + List trimmedArguments = new ArrayList<>(arguments); + while (true) { + if (trimmedArguments.isEmpty()) { + return new NoOpCommandHandler(); + } + String last = trimmedArguments.get(trimmedArguments.size() - 1); + if (!last.isEmpty()) { + break; + } + trimmedArguments.remove(trimmedArguments.size() - 1); + } + Namespace namespace; + try { + namespace = parser.parseArgs(trimmedArguments.toArray(new String[0])); + } catch (HelpScreenException e) { + return new NoOpCommandHandler(); + } catch (ArgumentParserException e) { + return new ErroneousCommandHandler(e.getMessage()); + } + String command = namespace.get("command"); + if (!command.equals(trimmedArguments.get(0))) { + return new ErroneousCommandHandler("invalid choice: '" + + trimmedArguments.get(0) + "': did you mean '" + command + "'?"); + } + Type type = TYPES.get(command); + if (type == null) { + return new ErroneousCommandHandler("Unknown command specified: " + command); + } else { + return type.createHandler(namespace); + } + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java new file mode 100644 index 00000000000..d52c55f9630 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import java.io.PrintWriter; +import java.util.Objects; +import java.util.Optional; + +/** + * Handles erroneous commands. + */ +public final class ErroneousCommandHandler implements Commands.Handler { + private final String message; + + public ErroneousCommandHandler(String message) { + this.message = message; + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) { + writer.println(message); + } + + @Override + public int hashCode() { + return Objects.hashCode(message); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ErroneousCommandHandler)) return false; + ErroneousCommandHandler o = (ErroneousCommandHandler) other; + if (!Objects.equals(o.message, message)) return false; + return true; + } + + @Override + public String toString() { + return "ErroneousCommandHandler(" + message + ")"; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java new file mode 100644 index 00000000000..2b11b352a8f --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.Exit; +import org.jline.reader.Candidate; + +import java.io.PrintWriter; +import java.util.List; +import java.util.Optional; + +/** + * Implements the exit command. + */ +public final class ExitCommandHandler implements Commands.Handler { + public final static Commands.Type TYPE = new ExitCommandType(); + + public static class ExitCommandType implements Commands.Type { + private ExitCommandType() { + } + + @Override + public String name() { + return "exit"; + } + + @Override + public String description() { + return "Exit the metadata shell."; + } + + @Override + public boolean shellOnly() { + return true; + } + + @Override + public void addArguments(ArgumentParser parser) { + // nothing to do + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + return new ExitCommandHandler(); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + // nothing to do + } + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) { + Exit.exit(0); + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ExitCommandHandler)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java new file mode 100644 index 00000000000..6d9ae44654b --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.shell.MetadataNode.DirectoryNode; +import org.jline.reader.Candidate; + +import java.io.PrintWriter; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; + +/** + * Implements the find command. + */ +public final class FindCommandHandler implements Commands.Handler { + public final static Commands.Type TYPE = new FindCommandType(); + + public static class FindCommandType implements Commands.Type { + private FindCommandType() { + } + + @Override + public String name() { + return "find"; + } + + @Override + public String description() { + return "Search for nodes in the directory hierarchy."; + } + + @Override + public boolean shellOnly() { + return false; + } + + @Override + public void addArguments(ArgumentParser parser) { + parser.addArgument("paths"). + nargs("*"). + help("The paths to start at."); + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + return new FindCommandHandler(namespace.getList("paths")); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1), + candidates); + } + } + + private final List paths; + + public FindCommandHandler(List paths) { + this.paths = paths; + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) throws Exception { + for (String path : CommandUtils.getEffectivePaths(paths)) { + manager.visit(new GlobVisitor(path, entryOption -> { + if (entryOption.isPresent()) { + find(writer, path, entryOption.get().node()); + } else { + writer.println("find: " + path + ": no such file or directory."); + } + })); + } + } + + private void find(PrintWriter writer, String path, MetadataNode node) { + writer.println(path); + if (node instanceof DirectoryNode) { + DirectoryNode directory = (DirectoryNode) node; + for (Entry entry : directory.children().entrySet()) { + String nextPath = path.equals("/") ? + path + entry.getKey() : path + "/" + entry.getKey(); + find(writer, nextPath, entry.getValue()); + } + } + } + + @Override + public int hashCode() { + return Objects.hashCode(paths); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof FindCommandHandler)) return false; + FindCommandHandler o = (FindCommandHandler) other; + if (!Objects.equals(o.paths, paths)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java b/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java new file mode 100644 index 00000000000..b93382b258e --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Pattern; + +/** + * Implements a per-path-component glob. + */ +public final class GlobComponent { + private static final Logger log = LoggerFactory.getLogger(GlobComponent.class); + + /** + * Returns true if the character is a special character for regular expressions. + */ + private static boolean isRegularExpressionSpecialCharacter(char ch) { + switch (ch) { + case '$': + case '(': + case ')': + case '+': + case '.': + case '[': + case ']': + case '^': + case '{': + case '|': + return true; + default: + break; + } + return false; + } + + /** + * Returns true if the character is a special character for globs. + */ + private static boolean isGlobSpecialCharacter(char ch) { + switch (ch) { + case '*': + case '?': + case '\\': + case '{': + case '}': + return true; + default: + break; + } + return false; + } + + /** + * Converts a glob string to a regular expression string. + * Returns null if the glob should be handled as a literal (can only match one string). + * Throws an exception if the glob is malformed. + */ + static String toRegularExpression(String glob) { + StringBuilder output = new StringBuilder("^"); + boolean literal = true; + boolean processingGroup = false; + + for (int i = 0; i < glob.length(); ) { + char c = glob.charAt(i++); + switch (c) { + case '?': + literal = false; + output.append("."); + break; + case '*': + literal = false; + output.append(".*"); + break; + case '\\': + if (i == glob.length()) { + output.append(c); + } else { + char next = glob.charAt(i); + i++; + if (isGlobSpecialCharacter(next) || + isRegularExpressionSpecialCharacter(next)) { + output.append('\\'); + } + output.append(next); + } + break; + case '{': + if (processingGroup) { + throw new RuntimeException("Can't nest glob groups."); + } + literal = false; + output.append("(?:(?:"); + processingGroup = true; + break; + case ',': + if (processingGroup) { + literal = false; + output.append(")|(?:"); + } else { + output.append(c); + } + break; + case '}': + if (processingGroup) { + literal = false; + output.append("))"); + processingGroup = false; + } else { + output.append(c); + } + break; + // TODO: handle character ranges + default: + if (isRegularExpressionSpecialCharacter(c)) { + output.append('\\'); + } + output.append(c); + } + } + if (processingGroup) { + throw new RuntimeException("Unterminated glob group."); + } + if (literal) { + return null; + } + output.append('$'); + return output.toString(); + } + + private final String component; + private final Pattern pattern; + + public GlobComponent(String component) { + this.component = component; + Pattern newPattern = null; + try { + String regularExpression = toRegularExpression(component); + if (regularExpression != null) { + newPattern = Pattern.compile(regularExpression); + } + } catch (RuntimeException e) { + log.debug("Invalid glob pattern: " + e.getMessage()); + } + this.pattern = newPattern; + } + + public String component() { + return component; + } + + public boolean literal() { + return pattern == null; + } + + public boolean matches(String nodeName) { + if (pattern == null) { + return component.equals(nodeName); + } else { + return pattern.matcher(nodeName).matches(); + } + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java b/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java new file mode 100644 index 00000000000..8081b7e4450 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import java.util.Arrays; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; + +/** + * Visits metadata paths based on a glob string. + */ +public final class GlobVisitor implements Consumer { + private final String glob; + private final Consumer> handler; + + public GlobVisitor(String glob, + Consumer> handler) { + this.glob = glob; + this.handler = handler; + } + + public static class MetadataNodeInfo { + private final String[] path; + private final MetadataNode node; + + MetadataNodeInfo(String[] path, MetadataNode node) { + this.path = path; + this.node = node; + } + + public String[] path() { + return path; + } + + public MetadataNode node() { + return node; + } + + public String lastPathComponent() { + if (path.length == 0) { + return "/"; + } else { + return path[path.length - 1]; + } + } + + public String absolutePath() { + return "/" + String.join("/", path); + } + + @Override + public int hashCode() { + return Objects.hash(path, node); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MetadataNodeInfo)) return false; + MetadataNodeInfo other = (MetadataNodeInfo) o; + if (!Arrays.equals(path, other.path)) return false; + if (!node.equals(other.node)) return false; + return true; + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder("MetadataNodeInfo(path="); + for (int i = 0; i < path.length; i++) { + bld.append("/"); + bld.append(path[i]); + } + bld.append(", node=").append(node).append(")"); + return bld.toString(); + } + } + + @Override + public void accept(MetadataNodeManager.Data data) { + String fullGlob = glob.startsWith("/") ? glob : + data.workingDirectory() + "/" + glob; + List globComponents = + CommandUtils.stripDotPathComponents(CommandUtils.splitPath(fullGlob)); + if (!accept(globComponents, 0, data.root(), new String[0])) { + handler.accept(Optional.empty()); + } + } + + private boolean accept(List globComponents, + int componentIndex, + MetadataNode node, + String[] path) { + if (componentIndex >= globComponents.size()) { + handler.accept(Optional.of(new MetadataNodeInfo(path, node))); + return true; + } + String globComponentString = globComponents.get(componentIndex); + GlobComponent globComponent = new GlobComponent(globComponentString); + if (globComponent.literal()) { + if (!(node instanceof MetadataNode.DirectoryNode)) { + return false; + } + MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node; + MetadataNode child = directory.child(globComponent.component()); + if (child == null) { + return false; + } + String[] newPath = new String[path.length + 1]; + System.arraycopy(path, 0, newPath, 0, path.length); + newPath[path.length] = globComponent.component(); + return accept(globComponents, componentIndex + 1, child, newPath); + } + if (!(node instanceof MetadataNode.DirectoryNode)) { + return false; + } + MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node; + boolean matchedAny = false; + for (Entry entry : directory.children().entrySet()) { + String nodeName = entry.getKey(); + if (globComponent.matches(nodeName)) { + String[] newPath = new String[path.length + 1]; + System.arraycopy(path, 0, newPath, 0, path.length); + newPath[path.length] = nodeName; + if (accept(globComponents, componentIndex + 1, entry.getValue(), newPath)) { + matchedAny = true; + } + } + } + return matchedAny; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java new file mode 100644 index 00000000000..829274eefcc --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.jline.reader.Candidate; + +import java.io.PrintWriter; +import java.util.List; +import java.util.Optional; + +/** + * Implements the help command. + */ +public final class HelpCommandHandler implements Commands.Handler { + public final static Commands.Type TYPE = new HelpCommandType(); + + public static class HelpCommandType implements Commands.Type { + private HelpCommandType() { + } + + @Override + public String name() { + return "help"; + } + + @Override + public String description() { + return "Display this help message."; + } + + @Override + public boolean shellOnly() { + return true; + } + + @Override + public void addArguments(ArgumentParser parser) { + // nothing to do + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + return new HelpCommandHandler(); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + // nothing to do + } + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) { + writer.printf("Welcome to the Apache Kafka metadata shell.%n%n"); + new Commands(true).parser().printHelp(writer); + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof HelpCommandHandler)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java new file mode 100644 index 00000000000..edf9def4c87 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.jline.reader.Candidate; + +import java.io.PrintWriter; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Implements the history command. + */ +public final class HistoryCommandHandler implements Commands.Handler { + public final static Commands.Type TYPE = new HistoryCommandType(); + + public static class HistoryCommandType implements Commands.Type { + private HistoryCommandType() { + } + + @Override + public String name() { + return "history"; + } + + @Override + public String description() { + return "Print command history."; + } + + @Override + public boolean shellOnly() { + return true; + } + + @Override + public void addArguments(ArgumentParser parser) { + parser.addArgument("numEntriesToShow"). + nargs("?"). + type(Integer.class). + help("The number of entries to show."); + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + Integer numEntriesToShow = namespace.getInt("numEntriesToShow"); + return new HistoryCommandHandler(numEntriesToShow == null ? + Integer.MAX_VALUE : numEntriesToShow); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + // nothing to do + } + } + + private final int numEntriesToShow; + + public HistoryCommandHandler(int numEntriesToShow) { + this.numEntriesToShow = numEntriesToShow; + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) throws Exception { + if (!shell.isPresent()) { + throw new RuntimeException("The history command requires a shell."); + } + Iterator> iter = shell.get().history(numEntriesToShow); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + writer.printf("% 5d %s%n", entry.getKey(), entry.getValue()); + } + } + + @Override + public int hashCode() { + return numEntriesToShow; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof HistoryCommandHandler)) return false; + HistoryCommandHandler o = (HistoryCommandHandler) other; + return o.numEntriesToShow == numEntriesToShow; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java b/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java new file mode 100644 index 00000000000..aa4d4ea56cf --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import org.jline.reader.Candidate; +import org.jline.reader.Completer; +import org.jline.reader.EndOfFileException; +import org.jline.reader.History; +import org.jline.reader.LineReader; +import org.jline.reader.LineReaderBuilder; +import org.jline.reader.ParsedLine; +import org.jline.reader.Parser; +import org.jline.reader.UserInterruptException; +import org.jline.reader.impl.DefaultParser; +import org.jline.reader.impl.history.DefaultHistory; +import org.jline.terminal.Terminal; +import org.jline.terminal.TerminalBuilder; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Optional; + +/** + * The Kafka metadata shell. + */ +public final class InteractiveShell implements AutoCloseable { + static class MetadataShellCompleter implements Completer { + private final MetadataNodeManager nodeManager; + + MetadataShellCompleter(MetadataNodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + @Override + public void complete(LineReader reader, ParsedLine line, List candidates) { + if (line.words().size() == 0) { + CommandUtils.completeCommand("", candidates); + } else if (line.words().size() == 1) { + CommandUtils.completeCommand(line.words().get(0), candidates); + } else { + Iterator iter = line.words().iterator(); + String command = iter.next(); + List nextWords = new ArrayList<>(); + while (iter.hasNext()) { + nextWords.add(iter.next()); + } + Commands.Type type = Commands.TYPES.get(command); + if (type == null) { + return; + } + try { + type.completeNext(nodeManager, nextWords, candidates); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + + private final MetadataNodeManager nodeManager; + private final Terminal terminal; + private final Parser parser; + private final History history; + private final MetadataShellCompleter completer; + private final LineReader reader; + + public InteractiveShell(MetadataNodeManager nodeManager) throws IOException { + this.nodeManager = nodeManager; + TerminalBuilder builder = TerminalBuilder.builder(). + system(true). + nativeSignals(true); + this.terminal = builder.build(); + this.parser = new DefaultParser(); + this.history = new DefaultHistory(); + this.completer = new MetadataShellCompleter(nodeManager); + this.reader = LineReaderBuilder.builder(). + terminal(terminal). + parser(parser). + history(history). + completer(completer). + option(LineReader.Option.AUTO_FRESH_LINE, false). + build(); + } + + public void runMainLoop() throws Exception { + terminal.writer().println("[ Kafka Metadata Shell ]"); + terminal.flush(); + Commands commands = new Commands(true); + while (true) { + try { + reader.readLine(">> "); + ParsedLine parsedLine = reader.getParsedLine(); + Commands.Handler handler = commands.parseCommand(parsedLine.words()); + handler.run(Optional.of(this), terminal.writer(), nodeManager); + terminal.writer().flush(); + } catch (UserInterruptException eof) { + // Handle the user pressing control-C. + terminal.writer().println("^C"); + } catch (EndOfFileException eof) { + return; + } + } + } + + public int screenWidth() { + return terminal.getWidth(); + } + + public Iterator> history(int numEntriesToShow) { + if (numEntriesToShow < 0) { + numEntriesToShow = 0; + } + int last = history.last(); + if (numEntriesToShow > last + 1) { + numEntriesToShow = last + 1; + } + int first = last - numEntriesToShow + 1; + if (first < history.first()) { + first = history.first(); + } + return new HistoryIterator(first, last); + } + + public class HistoryIterator implements Iterator> { + private int index; + private int last; + + HistoryIterator(int index, int last) { + this.index = index; + this.last = last; + } + + @Override + public boolean hasNext() { + return index <= last; + } + + @Override + public Entry next() { + if (index > last) { + throw new NoSuchElementException(); + } + int p = index++; + return new AbstractMap.SimpleImmutableEntry<>(p, history.get(p)); + } + } + + @Override + public void close() throws IOException { + terminal.close(); + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java new file mode 100644 index 00000000000..6260d122bfe --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo; +import org.apache.kafka.shell.MetadataNode.DirectoryNode; +import org.apache.kafka.shell.MetadataNode.FileNode; +import org.jline.reader.Candidate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; + +/** + * Implements the ls command. + */ +public final class LsCommandHandler implements Commands.Handler { + private static final Logger log = LoggerFactory.getLogger(LsCommandHandler.class); + + public final static Commands.Type TYPE = new LsCommandType(); + + public static class LsCommandType implements Commands.Type { + private LsCommandType() { + } + + @Override + public String name() { + return "ls"; + } + + @Override + public String description() { + return "List metadata nodes."; + } + + @Override + public boolean shellOnly() { + return false; + } + + @Override + public void addArguments(ArgumentParser parser) { + parser.addArgument("targets"). + nargs("*"). + help("The metadata node paths to list."); + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + return new LsCommandHandler(namespace.getList("targets")); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1), + candidates); + } + } + + private final List targets; + + public LsCommandHandler(List targets) { + this.targets = targets; + } + + static class TargetDirectory { + private final String name; + private final List children; + + TargetDirectory(String name, List children) { + this.name = name; + this.children = children; + } + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) throws Exception { + List targetFiles = new ArrayList<>(); + List targetDirectories = new ArrayList<>(); + for (String target : CommandUtils.getEffectivePaths(targets)) { + manager.visit(new GlobVisitor(target, entryOption -> { + if (entryOption.isPresent()) { + MetadataNodeInfo info = entryOption.get(); + MetadataNode node = info.node(); + if (node instanceof DirectoryNode) { + DirectoryNode directory = (DirectoryNode) node; + List children = new ArrayList<>(); + children.addAll(directory.children().keySet()); + targetDirectories.add( + new TargetDirectory(info.lastPathComponent(), children)); + } else if (node instanceof FileNode) { + targetFiles.add(info.lastPathComponent()); + } + } else { + writer.println("ls: " + target + ": no such file or directory."); + } + })); + } + OptionalInt screenWidth = shell.isPresent() ? + OptionalInt.of(shell.get().screenWidth()) : OptionalInt.empty(); + log.trace("LS : targetFiles = {}, targetDirectories = {}, screenWidth = {}", + targetFiles, targetDirectories, screenWidth); + printTargets(writer, screenWidth, targetFiles, targetDirectories); + } + + static void printTargets(PrintWriter writer, + OptionalInt screenWidth, + List targetFiles, + List targetDirectories) { + printEntries(writer, "", screenWidth, targetFiles); + boolean needIntro = targetFiles.size() > 0 || targetDirectories.size() > 1; + boolean firstIntro = targetFiles.isEmpty(); + for (TargetDirectory targetDirectory : targetDirectories) { + String intro = ""; + if (needIntro) { + if (!firstIntro) { + intro = intro + String.format("%n"); + } + intro = intro + targetDirectory.name + ":"; + firstIntro = false; + } + log.trace("LS : targetDirectory name = {}, children = {}", + targetDirectory.name, targetDirectory.children); + printEntries(writer, intro, screenWidth, targetDirectory.children); + } + } + + static void printEntries(PrintWriter writer, + String intro, + OptionalInt screenWidth, + List entries) { + if (entries.isEmpty()) { + return; + } + if (!intro.isEmpty()) { + writer.println(intro); + } + ColumnSchema columnSchema = calculateColumnSchema(screenWidth, entries); + int numColumns = columnSchema.numColumns(); + int numLines = (entries.size() + numColumns - 1) / numColumns; + for (int line = 0; line < numLines; line++) { + StringBuilder output = new StringBuilder(); + for (int column = 0; column < numColumns; column++) { + int entryIndex = line + (column * columnSchema.entriesPerColumn()); + if (entryIndex < entries.size()) { + String entry = entries.get(entryIndex); + output.append(entry); + if (column < numColumns - 1) { + int width = columnSchema.columnWidth(column); + for (int i = 0; i < width - entry.length(); i++) { + output.append(" "); + } + } + } + } + writer.println(output.toString()); + } + } + + static ColumnSchema calculateColumnSchema(OptionalInt screenWidth, + List entries) { + if (!screenWidth.isPresent()) { + return new ColumnSchema(1, entries.size()); + } + int maxColumns = screenWidth.getAsInt() / 4; + if (maxColumns <= 1) { + return new ColumnSchema(1, entries.size()); + } + ColumnSchema[] schemas = new ColumnSchema[maxColumns]; + for (int numColumns = 1; numColumns <= maxColumns; numColumns++) { + schemas[numColumns - 1] = new ColumnSchema(numColumns, + (entries.size() + numColumns - 1) / numColumns); + } + for (int i = 0; i < entries.size(); i++) { + String entry = entries.get(i); + for (int s = 0; s < schemas.length; s++) { + ColumnSchema schema = schemas[s]; + schema.process(i, entry); + } + } + for (int s = schemas.length - 1; s > 0; s--) { + ColumnSchema schema = schemas[s]; + if (schema.columnWidths[schema.columnWidths.length - 1] != 0 && + schema.totalWidth() <= screenWidth.getAsInt()) { + return schema; + } + } + return schemas[0]; + } + + static class ColumnSchema { + private final int[] columnWidths; + private final int entriesPerColumn; + + ColumnSchema(int numColumns, int entriesPerColumn) { + this.columnWidths = new int[numColumns]; + this.entriesPerColumn = entriesPerColumn; + } + + ColumnSchema setColumnWidths(Integer... widths) { + for (int i = 0; i < widths.length; i++) { + columnWidths[i] = widths[i]; + } + return this; + } + + void process(int entryIndex, String output) { + int columnIndex = entryIndex / entriesPerColumn; + columnWidths[columnIndex] = Math.max( + columnWidths[columnIndex], output.length() + 2); + } + + int totalWidth() { + int total = 0; + for (int i = 0; i < columnWidths.length; i++) { + total += columnWidths[i]; + } + return total; + } + + int numColumns() { + return columnWidths.length; + } + + int columnWidth(int columnIndex) { + return columnWidths[columnIndex]; + } + + int entriesPerColumn() { + return entriesPerColumn; + } + + @Override + public int hashCode() { + return Objects.hash(columnWidths, entriesPerColumn); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ColumnSchema)) return false; + ColumnSchema other = (ColumnSchema) o; + if (entriesPerColumn != other.entriesPerColumn) return false; + if (!Arrays.equals(columnWidths, other.columnWidths)) return false; + return true; + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder("ColumnSchema(columnWidths=["); + String prefix = ""; + for (int i = 0; i < columnWidths.length; i++) { + bld.append(prefix); + bld.append(columnWidths[i]); + prefix = ", "; + } + bld.append("], entriesPerColumn=").append(entriesPerColumn).append(")"); + return bld.toString(); + } + } + + @Override + public int hashCode() { + return Objects.hashCode(targets); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof LsCommandHandler)) return false; + LsCommandHandler o = (LsCommandHandler) other; + if (!Objects.equals(o.targets, targets)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java new file mode 100644 index 00000000000..dcd0b8cd716 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.jline.reader.Candidate; + +import java.io.PrintWriter; +import java.util.List; +import java.util.Optional; + +/** + * Implements the manual command. + */ +public final class ManCommandHandler implements Commands.Handler { + private final String cmd; + + public final static Commands.Type TYPE = new ManCommandType(); + + public static class ManCommandType implements Commands.Type { + private ManCommandType() { + } + + @Override + public String name() { + return "man"; + } + + @Override + public String description() { + return "Show the help text for a specific command."; + } + + @Override + public boolean shellOnly() { + return true; + } + + @Override + public void addArguments(ArgumentParser parser) { + parser.addArgument("cmd"). + nargs(1). + help("The command to get help text for."); + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + return new ManCommandHandler(namespace.getList("cmd").get(0)); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + if (nextWords.size() == 1) { + CommandUtils.completeCommand(nextWords.get(0), candidates); + } + } + } + + public ManCommandHandler(String cmd) { + this.cmd = cmd; + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) { + Commands.Type type = Commands.TYPES.get(cmd); + if (type == null) { + writer.println("man: unknown command " + cmd + + ". Type help to get a list of commands."); + } else { + ArgumentParser parser = ArgumentParsers.newArgumentParser(type.name(), false); + type.addArguments(parser); + writer.printf("%s: %s%n%n", cmd, type.description()); + parser.printHelp(writer); + } + } + + @Override + public int hashCode() { + return cmd.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ManCommandHandler)) return false; + ManCommandHandler o = (ManCommandHandler) other; + if (!o.cmd.equals(cmd)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java new file mode 100644 index 00000000000..3764a17b8b0 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * A node in the metadata tool. + */ +public interface MetadataNode { + class DirectoryNode implements MetadataNode { + private final TreeMap children = new TreeMap<>(); + + public DirectoryNode mkdirs(String... names) { + if (names.length == 0) { + throw new RuntimeException("Invalid zero-length path"); + } + DirectoryNode node = this; + for (int i = 0; i < names.length; i++) { + MetadataNode nextNode = node.children.get(names[i]); + if (nextNode == null) { + nextNode = new DirectoryNode(); + node.children.put(names[i], nextNode); + } else { + if (!(nextNode instanceof DirectoryNode)) { + throw new NotDirectoryException(); + } + } + node = (DirectoryNode) nextNode; + } + return node; + } + + public void rmrf(String... names) { + if (names.length == 0) { + throw new RuntimeException("Invalid zero-length path"); + } + DirectoryNode node = this; + for (int i = 0; i < names.length - 1; i++) { + MetadataNode nextNode = node.children.get(names[i]); + if (nextNode == null || !(nextNode instanceof DirectoryNode)) { + throw new RuntimeException("Unable to locate directory /" + + String.join("/", names)); + } + node = (DirectoryNode) nextNode; + } + node.children.remove(names[names.length - 1]); + } + + public FileNode create(String name) { + MetadataNode node = children.get(name); + if (node == null) { + node = new FileNode(); + children.put(name, node); + } else { + if (!(node instanceof FileNode)) { + throw new NotFileException(); + } + } + return (FileNode) node; + } + + public MetadataNode child(String component) { + return children.get(component); + } + + public NavigableMap children() { + return children; + } + + public void addChild(String name, DirectoryNode child) { + children.put(name, child); + } + + public DirectoryNode directory(String... names) { + if (names.length == 0) { + throw new RuntimeException("Invalid zero-length path"); + } + DirectoryNode node = this; + for (int i = 0; i < names.length; i++) { + MetadataNode nextNode = node.children.get(names[i]); + if (nextNode == null || !(nextNode instanceof DirectoryNode)) { + throw new RuntimeException("Unable to locate directory /" + + String.join("/", names)); + } + node = (DirectoryNode) nextNode; + } + return node; + } + + public FileNode file(String... names) { + if (names.length == 0) { + throw new RuntimeException("Invalid zero-length path"); + } + DirectoryNode node = this; + for (int i = 0; i < names.length - 1; i++) { + MetadataNode nextNode = node.children.get(names[i]); + if (nextNode == null || !(nextNode instanceof DirectoryNode)) { + throw new RuntimeException("Unable to locate file /" + + String.join("/", names)); + } + node = (DirectoryNode) nextNode; + } + MetadataNode nextNode = node.child(names[names.length - 1]); + if (nextNode == null || !(nextNode instanceof FileNode)) { + throw new RuntimeException("Unable to locate file /" + + String.join("/", names)); + } + return (FileNode) nextNode; + } + } + + class FileNode implements MetadataNode { + private String contents; + + void setContents(String contents) { + this.contents = contents; + } + + String contents() { + return contents; + } + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java new file mode 100644 index 00000000000..7910285d484 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.IsrChangeRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.PartitionRecordJsonConverter; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metalog.MetaLogLeader; +import org.apache.kafka.metalog.MetaLogListener; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.RaftClient; +import org.apache.kafka.shell.MetadataNode.DirectoryNode; +import org.apache.kafka.shell.MetadataNode.FileNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * Maintains the in-memory metadata for the metadata tool. + */ +public final class MetadataNodeManager implements AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class); + + public static class Data { + private final DirectoryNode root = new DirectoryNode(); + private String workingDirectory = "/"; + + public DirectoryNode root() { + return root; + } + + public String workingDirectory() { + return workingDirectory; + } + + public void setWorkingDirectory(String workingDirectory) { + this.workingDirectory = workingDirectory; + } + } + + class LogListener implements MetaLogListener, RaftClient.Listener { + @Override + public void handleCommit(BatchReader reader) { + try { + // TODO: handle lastOffset + while (reader.hasNext()) { + BatchReader.Batch batch = reader.next(); + for (ApiMessageAndVersion messageAndVersion : batch.records()) { + handleMessage(messageAndVersion.message()); + } + } + } finally { + reader.close(); + } + } + + @Override + public void handleCommits(long lastOffset, List messages) { + appendEvent("handleCommits", () -> { + log.error("handleCommits " + messages + " at offset " + lastOffset); + DirectoryNode dir = data.root.mkdirs("metadataQuorum"); + dir.create("offset").setContents(String.valueOf(lastOffset)); + for (ApiMessage message : messages) { + handleMessage(message); + } + }, null); + } + + @Override + public void handleNewLeader(MetaLogLeader leader) { + appendEvent("handleNewLeader", () -> { + log.error("handleNewLeader " + leader); + DirectoryNode dir = data.root.mkdirs("metadataQuorum"); + dir.create("leader").setContents(leader.toString()); + }, null); + } + + @Override + public void handleClaim(int epoch) { + // This shouldn't happen because we should never be the leader. + log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")"); + } + + @Override + public void handleRenounce(long epoch) { + // This shouldn't happen because we should never be the leader. + log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")"); + } + + @Override + public void beginShutdown() { + log.debug("MetaLogListener sent beginShutdown"); + } + } + + private final Data data = new Data(); + private final LogListener logListener = new LogListener(); + private final ObjectMapper objectMapper; + private final KafkaEventQueue queue; + + public MetadataNodeManager() { + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new Jdk8Module()); + this.queue = new KafkaEventQueue(Time.SYSTEM, + new LogContext("[node-manager-event-queue] "), ""); + } + + public void setup() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + appendEvent("createShellNodes", () -> { + DirectoryNode directory = data.root().mkdirs("local"); + directory.create("version").setContents(AppInfoParser.getVersion()); + directory.create("commitId").setContents(AppInfoParser.getCommitId()); + future.complete(null); + }, future); + future.get(); + } + + public LogListener logListener() { + return logListener; + } + + @Override + public void close() throws Exception { + queue.close(); + } + + public void visit(Consumer consumer) throws Exception { + CompletableFuture future = new CompletableFuture<>(); + appendEvent("visit", () -> { + consumer.accept(data); + future.complete(null); + }, future); + future.get(); + } + + private void appendEvent(String name, Runnable runnable, CompletableFuture future) { + queue.append(new EventQueue.Event() { + @Override + public void run() throws Exception { + runnable.run(); + } + + @Override + public void handleException(Throwable e) { + log.error("Unexpected error while handling event " + name, e); + if (future != null) { + future.completeExceptionally(e); + } + } + }); + } + + private void handleMessage(ApiMessage message) { + try { + MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); + handleCommitImpl(type, message); + } catch (Exception e) { + log.error("Error processing record of type " + message.apiKey(), e); + } + } + + private void handleCommitImpl(MetadataRecordType type, ApiMessage message) + throws Exception { + switch (type) { + case REGISTER_BROKER_RECORD: { + DirectoryNode brokersNode = data.root.mkdirs("brokers"); + RegisterBrokerRecord record = (RegisterBrokerRecord) message; + DirectoryNode brokerNode = brokersNode. + mkdirs(Integer.toString(record.brokerId())); + FileNode registrationNode = brokerNode.create("registration"); + registrationNode.setContents(record.toString()); + brokerNode.create("isFenced").setContents("true"); + break; + } + case UNREGISTER_BROKER_RECORD: { + UnregisterBrokerRecord record = (UnregisterBrokerRecord) message; + data.root.rmrf("brokers", Integer.toString(record.brokerId())); + break; + } + case TOPIC_RECORD: { + TopicRecord record = (TopicRecord) message; + DirectoryNode topicsDirectory = data.root.mkdirs("topics"); + DirectoryNode topicDirectory = topicsDirectory.mkdirs(record.name()); + topicDirectory.create("id").setContents(record.topicId().toString()); + topicDirectory.create("name").setContents(record.name().toString()); + DirectoryNode topicIdsDirectory = data.root.mkdirs("topicIds"); + topicIdsDirectory.addChild(record.topicId().toString(), topicDirectory); + break; + } + case PARTITION_RECORD: { + PartitionRecord record = (PartitionRecord) message; + DirectoryNode topicDirectory = + data.root.mkdirs("topicIds").mkdirs(record.topicId().toString()); + DirectoryNode partitionDirectory = + topicDirectory.mkdirs(Integer.toString(record.partitionId())); + JsonNode node = PartitionRecordJsonConverter. + write(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION); + partitionDirectory.create("data").setContents(node.toPrettyString()); + break; + } + case CONFIG_RECORD: { + ConfigRecord record = (ConfigRecord) message; + String typeString = ""; + switch (ConfigResource.Type.forId(record.resourceType())) { + case BROKER: + typeString = "broker"; + break; + case TOPIC: + typeString = "topic"; + break; + default: + throw new RuntimeException("Error processing CONFIG_RECORD: " + + "Can't handle ConfigResource.Type " + record.resourceType()); + } + DirectoryNode configDirectory = data.root.mkdirs("configs"). + mkdirs(typeString).mkdirs(record.resourceName()); + if (record.value() == null) { + configDirectory.rmrf(record.name()); + } else { + configDirectory.create(record.name()).setContents(record.value()); + } + break; + } + case ISR_CHANGE_RECORD: { + IsrChangeRecord record = (IsrChangeRecord) message; + FileNode file = data.root.file("topicIds", record.topicId().toString(), + Integer.toString(record.partitionId()), "data"); + JsonNode node = objectMapper.readTree(file.contents()); + PartitionRecord partition = PartitionRecordJsonConverter. + read(node, PartitionRecord.HIGHEST_SUPPORTED_VERSION); + partition.setIsr(record.isr()); + partition.setLeader(record.leader()); + partition.setLeaderEpoch(record.leaderEpoch()); + partition.setPartitionEpoch(record.partitionEpoch()); + file.setContents(PartitionRecordJsonConverter.write(partition, + PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString()); + break; + } + case FENCE_BROKER_RECORD: { + FenceBrokerRecord record = (FenceBrokerRecord) message; + data.root.mkdirs("brokers", Integer.toString(record.id())). + create("isFenced").setContents("true"); + break; + } + case UNFENCE_BROKER_RECORD: { + UnfenceBrokerRecord record = (UnfenceBrokerRecord) message; + data.root.mkdirs("brokers", Integer.toString(record.id())). + create("isFenced").setContents("false"); + break; + } + case REMOVE_TOPIC_RECORD: { + RemoveTopicRecord record = (RemoveTopicRecord) message; + DirectoryNode topicsDirectory = + data.root.directory("topicIds", record.topicId().toString()); + String name = topicsDirectory.file("name").contents(); + data.root.rmrf("topics", name); + data.root.rmrf("topicIds", record.topicId().toString()); + break; + } + default: + throw new RuntimeException("Unhandled metadata record type"); + } + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java new file mode 100644 index 00000000000..b701310efb7 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import kafka.raft.KafkaRaftManager; +import kafka.tools.TerseFailure; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + + +/** + * The Kafka metadata shell. + */ +public final class MetadataShell { + private static final Logger log = LoggerFactory.getLogger(MetadataShell.class); + + public static class Builder { + private String snapshotPath; + + public Builder setSnapshotPath(String snapshotPath) { + this.snapshotPath = snapshotPath; + return this; + } + + public MetadataShell build() throws Exception { + MetadataNodeManager nodeManager = null; + SnapshotFileReader reader = null; + try { + nodeManager = new MetadataNodeManager(); + reader = new SnapshotFileReader(snapshotPath, nodeManager.logListener()); + return new MetadataShell(null, reader, nodeManager); + } catch (Throwable e) { + log.error("Initialization error", e); + if (reader != null) { + reader.close(); + } + if (nodeManager != null) { + nodeManager.close(); + } + throw e; + } + } + } + + private final KafkaRaftManager raftManager; + + private final SnapshotFileReader snapshotFileReader; + + private final MetadataNodeManager nodeManager; + + public MetadataShell(KafkaRaftManager raftManager, + SnapshotFileReader snapshotFileReader, + MetadataNodeManager nodeManager) { + this.raftManager = raftManager; + this.snapshotFileReader = snapshotFileReader; + this.nodeManager = nodeManager; + } + + public void run(List args) throws Exception { + nodeManager.setup(); + if (raftManager != null) { + raftManager.startup(); + raftManager.register(nodeManager.logListener()); + } else if (snapshotFileReader != null) { + snapshotFileReader.startup(); + } else { + throw new RuntimeException("Expected either a raft manager or snapshot reader"); + } + if (args == null || args.isEmpty()) { + // Interactive mode. + try (InteractiveShell shell = new InteractiveShell(nodeManager)) { + shell.runMainLoop(); + } + } else { + // Non-interactive mode. + Commands commands = new Commands(false); + try (PrintWriter writer = new PrintWriter(new BufferedWriter( + new OutputStreamWriter(System.out, StandardCharsets.UTF_8)))) { + Commands.Handler handler = commands.parseCommand(args); + handler.run(Optional.empty(), writer, nodeManager); + writer.flush(); + } + } + } + + public void close() throws Exception { + if (raftManager != null) { + raftManager.shutdown(); + } + if (snapshotFileReader != null) { + snapshotFileReader.close(); + } + nodeManager.close(); + } + + public static void main(String[] args) throws Exception { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("metadata-tool") + .defaultHelp(true) + .description("The Apache Kafka metadata tool"); + parser.addArgument("--snapshot", "-s") + .type(String.class) + .help("The snapshot file to read."); + parser.addArgument("command") + .nargs("*") + .help("The command to run."); + Namespace res = parser.parseArgsOrFail(args); + try { + Builder builder = new Builder(); + builder.setSnapshotPath(res.getString("snapshot")); + Path tempDir = Files.createTempDirectory("MetadataShell"); + Exit.addShutdownHook("agent-shutdown-hook", () -> { + log.debug("Removing temporary directory " + tempDir.toAbsolutePath().toString()); + try { + Utils.delete(tempDir.toFile()); + } catch (Exception e) { + log.error("Got exception while removing temporary directory " + + tempDir.toAbsolutePath().toString()); + } + }); + MetadataShell shell = builder.build(); + shell.waitUntilCaughtUp(); + try { + shell.run(res.getList("command")); + } finally { + shell.close(); + } + Exit.exit(0); + } catch (TerseFailure e) { + System.err.println("Error: " + e.getMessage()); + Exit.exit(1); + } catch (Throwable e) { + System.err.println("Unexpected error: " + + (e.getMessage() == null ? "" : e.getMessage())); + e.printStackTrace(System.err); + Exit.exit(1); + } + } + + void waitUntilCaughtUp() throws ExecutionException, InterruptedException { + snapshotFileReader.caughtUpFuture().get(); + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java new file mode 100644 index 00000000000..1756ba76aa8 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import java.io.PrintWriter; +import java.util.Optional; + +/** + * Does nothing. + */ +public final class NoOpCommandHandler implements Commands.Handler { + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) { + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof NoOpCommandHandler)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java b/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java new file mode 100644 index 00000000000..692534758e2 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +/** + * An exception that is thrown when a non-directory node is treated like a + * directory. + */ +public class NotDirectoryException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public NotDirectoryException() { + super(); + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/NotFileException.java b/shell/src/main/java/org/apache/kafka/shell/NotFileException.java new file mode 100644 index 00000000000..cbc2a832d67 --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/NotFileException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +/** + * An exception that is thrown when a non-file node is treated like a + * file. + */ +public class NotFileException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public NotFileException() { + super(); + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java new file mode 100644 index 00000000000..1e5b5da39ef --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.jline.reader.Candidate; + +import java.io.PrintWriter; +import java.util.List; +import java.util.Optional; + +/** + * Implements the pwd command. + */ +public final class PwdCommandHandler implements Commands.Handler { + public final static Commands.Type TYPE = new PwdCommandType(); + + public static class PwdCommandType implements Commands.Type { + private PwdCommandType() { + } + + @Override + public String name() { + return "pwd"; + } + + @Override + public String description() { + return "Print the current working directory."; + } + + @Override + public boolean shellOnly() { + return true; + } + + @Override + public void addArguments(ArgumentParser parser) { + // nothing to do + } + + @Override + public Commands.Handler createHandler(Namespace namespace) { + return new PwdCommandHandler(); + } + + @Override + public void completeNext(MetadataNodeManager nodeManager, List nextWords, + List candidates) throws Exception { + // nothing to do + } + } + + @Override + public void run(Optional shell, + PrintWriter writer, + MetadataNodeManager manager) throws Exception { + manager.visit(data -> { + writer.println(data.workingDirectory()); + }); + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof PwdCommandHandler)) return false; + return true; + } +} diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java new file mode 100644 index 00000000000..e566be67d7d --- /dev/null +++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metalog.MetaLogLeader; +import org.apache.kafka.metalog.MetaLogListener; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; + + +/** + * Reads Kafka metadata snapshots. + */ +public final class SnapshotFileReader implements AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(SnapshotFileReader.class); + + private final String snapshotPath; + private final MetaLogListener listener; + private final KafkaEventQueue queue; + private final CompletableFuture caughtUpFuture; + private FileRecords fileRecords; + private Iterator batchIterator; + + public SnapshotFileReader(String snapshotPath, MetaLogListener listener) { + this.snapshotPath = snapshotPath; + this.listener = listener; + this.queue = new KafkaEventQueue(Time.SYSTEM, + new LogContext("[snapshotReaderQueue] "), "snapshotReaderQueue_"); + this.caughtUpFuture = new CompletableFuture<>(); + } + + public void startup() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + queue.append(new EventQueue.Event() { + @Override + public void run() throws Exception { + fileRecords = FileRecords.open(new File(snapshotPath), false); + batchIterator = fileRecords.batches().iterator(); + scheduleHandleNextBatch(); + future.complete(null); + } + + @Override + public void handleException(Throwable e) { + future.completeExceptionally(e); + beginShutdown("startup error"); + } + }); + future.get(); + } + + private void handleNextBatch() { + if (!batchIterator.hasNext()) { + beginShutdown("done"); + return; + } + FileChannelRecordBatch batch = batchIterator.next(); + if (batch.isControlBatch()) { + handleControlBatch(batch); + } else { + handleMetadataBatch(batch); + } + scheduleHandleNextBatch(); + } + + private void scheduleHandleNextBatch() { + queue.append(new EventQueue.Event() { + @Override + public void run() throws Exception { + handleNextBatch(); + } + + @Override + public void handleException(Throwable e) { + log.error("Unexpected error while handling a batch of events", e); + beginShutdown("handleBatch error"); + } + }); + } + + private void handleControlBatch(FileChannelRecordBatch batch) { + for (Iterator iter = batch.iterator(); iter.hasNext(); ) { + Record record = iter.next(); + try { + short typeId = ControlRecordType.parseTypeId(record.key()); + ControlRecordType type = ControlRecordType.fromTypeId(typeId); + switch (type) { + case LEADER_CHANGE: + LeaderChangeMessage message = new LeaderChangeMessage(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + listener.handleNewLeader(new MetaLogLeader(message.leaderId(), + batch.partitionLeaderEpoch())); + break; + default: + log.error("Ignoring control record with type {} at offset {}", + type, record.offset()); + } + } catch (Throwable e) { + log.error("unable to read control record at offset {}", record.offset(), e); + } + } + } + + private void handleMetadataBatch(FileChannelRecordBatch batch) { + List messages = new ArrayList<>(); + for (Iterator iter = batch.iterator(); iter.hasNext(); ) { + Record record = iter.next(); + ByteBufferAccessor accessor = new ByteBufferAccessor(record.value()); + try { + int apiKey = accessor.readUnsignedVarint(); + if (apiKey > Short.MAX_VALUE || apiKey < 0) { + throw new RuntimeException("Invalid apiKey value " + apiKey); + } + int apiVersion = accessor.readUnsignedVarint(); + if (apiVersion > Short.MAX_VALUE || apiVersion < 0) { + throw new RuntimeException("Invalid apiVersion value " + apiVersion); + } + ApiMessage message = MetadataRecordType.fromId((short) apiKey).newMetadataRecord(); + message.read(accessor, (short) apiVersion); + messages.add(message); + } catch (Throwable e) { + log.error("unable to read metadata record at offset {}", record.offset(), e); + } + } + listener.handleCommits(batch.lastOffset(), messages); + } + + public void beginShutdown(String reason) { + if (reason.equals("done")) { + caughtUpFuture.complete(null); + } else { + caughtUpFuture.completeExceptionally(new RuntimeException(reason)); + } + queue.beginShutdown(reason, new EventQueue.Event() { + @Override + public void run() throws Exception { + listener.beginShutdown(); + if (fileRecords != null) { + fileRecords.close(); + fileRecords = null; + } + batchIterator = null; + } + + @Override + public void handleException(Throwable e) { + log.error("shutdown error", e); + } + }); + } + + @Override + public void close() throws Exception { + beginShutdown("closing"); + queue.close(); + } + + public CompletableFuture caughtUpFuture() { + return caughtUpFuture; + } +} diff --git a/shell/src/test/java/org/apache/kafka/shell/CommandTest.java b/shell/src/test/java/org/apache/kafka/shell/CommandTest.java new file mode 100644 index 00000000000..c896a06caa0 --- /dev/null +++ b/shell/src/test/java/org/apache/kafka/shell/CommandTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +@Timeout(value = 120000, unit = MILLISECONDS) +public class CommandTest { + @Test + public void testParseCommands() { + assertEquals(new CatCommandHandler(Arrays.asList("foo")), + new Commands(true).parseCommand(Arrays.asList("cat", "foo"))); + assertEquals(new CdCommandHandler(Optional.empty()), + new Commands(true).parseCommand(Arrays.asList("cd"))); + assertEquals(new CdCommandHandler(Optional.of("foo")), + new Commands(true).parseCommand(Arrays.asList("cd", "foo"))); + assertEquals(new ExitCommandHandler(), + new Commands(true).parseCommand(Arrays.asList("exit"))); + assertEquals(new HelpCommandHandler(), + new Commands(true).parseCommand(Arrays.asList("help"))); + assertEquals(new HistoryCommandHandler(3), + new Commands(true).parseCommand(Arrays.asList("history", "3"))); + assertEquals(new HistoryCommandHandler(Integer.MAX_VALUE), + new Commands(true).parseCommand(Arrays.asList("history"))); + assertEquals(new LsCommandHandler(Collections.emptyList()), + new Commands(true).parseCommand(Arrays.asList("ls"))); + assertEquals(new LsCommandHandler(Arrays.asList("abc", "123")), + new Commands(true).parseCommand(Arrays.asList("ls", "abc", "123"))); + assertEquals(new PwdCommandHandler(), + new Commands(true).parseCommand(Arrays.asList("pwd"))); + } + + @Test + public void testParseInvalidCommand() { + assertEquals(new ErroneousCommandHandler("invalid choice: 'blah' (choose " + + "from 'cat', 'cd', 'exit', 'find', 'help', 'history', 'ls', 'man', 'pwd')"), + new Commands(true).parseCommand(Arrays.asList("blah"))); + } + + @Test + public void testEmptyCommandLine() { + assertEquals(new NoOpCommandHandler(), + new Commands(true).parseCommand(Arrays.asList(""))); + assertEquals(new NoOpCommandHandler(), + new Commands(true).parseCommand(Collections.emptyList())); + } +} diff --git a/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java b/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java new file mode 100644 index 00000000000..90c3b5c1489 --- /dev/null +++ b/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; + +@Timeout(value = 120000, unit = MILLISECONDS) +public class CommandUtilsTest { + @Test + public void testSplitPath() { + assertEquals(Arrays.asList("alpha", "beta"), + CommandUtils.splitPath("/alpha/beta")); + assertEquals(Arrays.asList("alpha", "beta"), + CommandUtils.splitPath("//alpha/beta/")); + } +} diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java new file mode 100644 index 00000000000..da3a7ec1081 --- /dev/null +++ b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 120000, unit = MILLISECONDS) +public class GlobComponentTest { + private void verifyIsLiteral(GlobComponent globComponent, String component) { + assertTrue(globComponent.literal()); + assertEquals(component, globComponent.component()); + assertTrue(globComponent.matches(component)); + assertFalse(globComponent.matches(component + "foo")); + } + + @Test + public void testLiteralComponent() { + verifyIsLiteral(new GlobComponent("abc"), "abc"); + verifyIsLiteral(new GlobComponent(""), ""); + verifyIsLiteral(new GlobComponent("foobar_123"), "foobar_123"); + verifyIsLiteral(new GlobComponent("$blah+"), "$blah+"); + } + + @Test + public void testToRegularExpression() { + assertEquals(null, GlobComponent.toRegularExpression("blah")); + assertEquals(null, GlobComponent.toRegularExpression("")); + assertEquals(null, GlobComponent.toRegularExpression("does not need a regex, actually")); + assertEquals("^\\$blah.*$", GlobComponent.toRegularExpression("$blah*")); + assertEquals("^.*$", GlobComponent.toRegularExpression("*")); + assertEquals("^foo(?:(?:bar)|(?:baz))$", GlobComponent.toRegularExpression("foo{bar,baz}")); + } + + @Test + public void testGlobMatch() { + GlobComponent star = new GlobComponent("*"); + assertFalse(star.literal()); + assertTrue(star.matches("")); + assertTrue(star.matches("anything")); + GlobComponent question = new GlobComponent("b?b"); + assertFalse(question.literal()); + assertFalse(question.matches("")); + assertTrue(question.matches("bob")); + assertTrue(question.matches("bib")); + assertFalse(question.matches("bic")); + GlobComponent foobarOrFoobaz = new GlobComponent("foo{bar,baz}"); + assertFalse(foobarOrFoobaz.literal()); + assertTrue(foobarOrFoobaz.matches("foobar")); + assertTrue(foobarOrFoobaz.matches("foobaz")); + assertFalse(foobarOrFoobaz.matches("foobah")); + assertFalse(foobarOrFoobaz.matches("foo")); + assertFalse(foobarOrFoobaz.matches("baz")); + } +} diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java b/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java new file mode 100644 index 00000000000..59eeb5db79e --- /dev/null +++ b/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +@Timeout(value = 120000, unit = MILLISECONDS) +public class GlobVisitorTest { + static private final MetadataNodeManager.Data DATA; + + static { + DATA = new MetadataNodeManager.Data(); + DATA.root().mkdirs("alpha", "beta", "gamma"); + DATA.root().mkdirs("alpha", "theta"); + DATA.root().mkdirs("foo", "a"); + DATA.root().mkdirs("foo", "beta"); + DATA.root().mkdirs("zeta").create("c"); + DATA.root().mkdirs("zeta"); + DATA.root().create("zzz"); + DATA.setWorkingDirectory("foo"); + } + + static class InfoConsumer implements Consumer> { + private Optional> infos = null; + + @Override + public void accept(Optional info) { + if (infos == null) { + if (info.isPresent()) { + infos = Optional.of(new ArrayList<>()); + infos.get().add(info.get()); + } else { + infos = Optional.empty(); + } + } else { + if (info.isPresent()) { + infos.get().add(info.get()); + } else { + throw new RuntimeException("Saw non-empty info after seeing empty info"); + } + } + } + } + + @Test + public void testStarGlob() { + InfoConsumer consumer = new InfoConsumer(); + GlobVisitor visitor = new GlobVisitor("*", consumer); + visitor.accept(DATA); + assertEquals(Optional.of(Arrays.asList( + new MetadataNodeInfo(new String[] {"foo", "a"}, + DATA.root().directory("foo").child("a")), + new MetadataNodeInfo(new String[] {"foo", "beta"}, + DATA.root().directory("foo").child("beta")))), consumer.infos); + } + + @Test + public void testDotDot() { + InfoConsumer consumer = new InfoConsumer(); + GlobVisitor visitor = new GlobVisitor("..", consumer); + visitor.accept(DATA); + assertEquals(Optional.of(Arrays.asList( + new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos); + } + + @Test + public void testDoubleDotDot() { + InfoConsumer consumer = new InfoConsumer(); + GlobVisitor visitor = new GlobVisitor("../..", consumer); + visitor.accept(DATA); + assertEquals(Optional.of(Arrays.asList( + new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos); + } + + @Test + public void testZGlob() { + InfoConsumer consumer = new InfoConsumer(); + GlobVisitor visitor = new GlobVisitor("../z*", consumer); + visitor.accept(DATA); + assertEquals(Optional.of(Arrays.asList( + new MetadataNodeInfo(new String[] {"zeta"}, + DATA.root().child("zeta")), + new MetadataNodeInfo(new String[] {"zzz"}, + DATA.root().child("zzz")))), consumer.infos); + } + + @Test + public void testBetaOrThetaGlob() { + InfoConsumer consumer = new InfoConsumer(); + GlobVisitor visitor = new GlobVisitor("../*/{beta,theta}", consumer); + visitor.accept(DATA); + assertEquals(Optional.of(Arrays.asList( + new MetadataNodeInfo(new String[] {"alpha", "beta"}, + DATA.root().directory("alpha").child("beta")), + new MetadataNodeInfo(new String[] {"alpha", "theta"}, + DATA.root().directory("alpha").child("theta")), + new MetadataNodeInfo(new String[] {"foo", "beta"}, + DATA.root().directory("foo").child("beta")))), consumer.infos); + } + + @Test + public void testNotFoundGlob() { + InfoConsumer consumer = new InfoConsumer(); + GlobVisitor visitor = new GlobVisitor("epsilon", consumer); + visitor.accept(DATA); + assertEquals(Optional.empty(), consumer.infos); + } + + @Test + public void testAbsoluteGlob() { + InfoConsumer consumer = new InfoConsumer(); + GlobVisitor visitor = new GlobVisitor("/a?pha", consumer); + visitor.accept(DATA); + assertEquals(Optional.of(Arrays.asList( + new MetadataNodeInfo(new String[] {"alpha"}, + DATA.root().directory("alpha")))), consumer.infos); + } +} diff --git a/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java b/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java new file mode 100644 index 00000000000..c845706f1b6 --- /dev/null +++ b/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.kafka.shell.LsCommandHandler.ColumnSchema; +import org.apache.kafka.shell.LsCommandHandler.TargetDirectory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.OptionalInt; + +@Timeout(value = 120000, unit = MILLISECONDS) +public class LsCommandHandlerTest { + @Test + public void testCalculateColumnSchema() { + assertEquals(new ColumnSchema(1, 3), + LsCommandHandler.calculateColumnSchema(OptionalInt.empty(), + Arrays.asList("abc", "def", "ghi"))); + assertEquals(new ColumnSchema(1, 2), + LsCommandHandler.calculateColumnSchema(OptionalInt.of(0), + Arrays.asList("abc", "def"))); + assertEquals(new ColumnSchema(3, 1).setColumnWidths(3, 8, 6), + LsCommandHandler.calculateColumnSchema(OptionalInt.of(80), + Arrays.asList("a", "abcdef", "beta"))); + assertEquals(new ColumnSchema(2, 3).setColumnWidths(10, 7), + LsCommandHandler.calculateColumnSchema(OptionalInt.of(18), + Arrays.asList("alphabet", "beta", "gamma", "theta", "zeta"))); + } + + @Test + public void testPrintEntries() throws Exception { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + try (PrintWriter writer = new PrintWriter(new OutputStreamWriter( + stream, StandardCharsets.UTF_8))) { + LsCommandHandler.printEntries(writer, "", OptionalInt.of(18), + Arrays.asList("alphabet", "beta", "gamma", "theta", "zeta")); + } + assertEquals(String.join(String.format("%n"), Arrays.asList( + "alphabet theta", + "beta zeta", + "gamma")), stream.toString().trim()); + } + } + + @Test + public void testPrintTargets() throws Exception { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + try (PrintWriter writer = new PrintWriter(new OutputStreamWriter( + stream, StandardCharsets.UTF_8))) { + LsCommandHandler.printTargets(writer, OptionalInt.of(18), + Arrays.asList("foo", "foobarbaz", "quux"), Arrays.asList( + new TargetDirectory("/some/dir", + Collections.singletonList("supercalifragalistic")), + new TargetDirectory("/some/other/dir", + Arrays.asList("capability", "delegation", "elephant", + "fungible", "green")))); + } + assertEquals(String.join(String.format("%n"), Arrays.asList( + "foo quux", + "foobarbaz ", + "", + "/some/dir:", + "supercalifragalistic", + "", + "/some/other/dir:", + "capability", + "delegation", + "elephant", + "fungible", + "green")), stream.toString().trim()); + } + } +} + diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java new file mode 100644 index 00000000000..42223c78c80 --- /dev/null +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.kafka.shell.MetadataNode.DirectoryNode; +import org.apache.kafka.shell.MetadataNode.FileNode; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +@Timeout(value = 120000, unit = MILLISECONDS) +public class MetadataNodeTest { + @Test + public void testMkdirs() { + DirectoryNode root = new DirectoryNode(); + DirectoryNode defNode = root.mkdirs("abc", "def"); + DirectoryNode defNode2 = root.mkdirs("abc", "def"); + assertTrue(defNode == defNode2); + DirectoryNode defNode3 = root.directory("abc", "def"); + assertTrue(defNode == defNode3); + root.mkdirs("ghi"); + assertEquals(new HashSet<>(Arrays.asList("abc", "ghi")), root.children().keySet()); + assertEquals(Collections.singleton("def"), root.mkdirs("abc").children().keySet()); + assertEquals(Collections.emptySet(), defNode.children().keySet()); + } + + @Test + public void testRmrf() { + DirectoryNode root = new DirectoryNode(); + DirectoryNode foo = root.mkdirs("foo"); + foo.mkdirs("a"); + foo.mkdirs("b"); + root.mkdirs("baz"); + assertEquals(new HashSet<>(Arrays.asList("foo", "baz")), root.children().keySet()); + root.rmrf("foo", "a"); + assertEquals(new HashSet<>(Arrays.asList("b")), foo.children().keySet()); + root.rmrf("foo"); + assertEquals(new HashSet<>(Collections.singleton("baz")), root.children().keySet()); + } + + @Test + public void testCreateFiles() { + DirectoryNode root = new DirectoryNode(); + DirectoryNode abcdNode = root.mkdirs("abcd"); + FileNode quuxNodde = abcdNode.create("quux"); + quuxNodde.setContents("quux contents"); + assertEquals("quux contents", quuxNodde.contents()); + assertThrows(NotDirectoryException.class, () -> root.mkdirs("abcd", "quux")); + } +}