diff --git a/bin/kafka-metadata-shell.sh b/bin/kafka-metadata-shell.sh
new file mode 100755
index 00000000000..289f0c1b51f
--- /dev/null
+++ b/bin/kafka-metadata-shell.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"
diff --git a/build.gradle b/build.gradle
index 790cd116139..d03748848ce 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1351,6 +1351,49 @@ project(':tools') {
}
}
+project(':shell') {
+ archivesBaseName = "kafka-shell"
+
+ dependencies {
+ compile libs.argparse4j
+ compile libs.jacksonDatabind
+ compile libs.jacksonJDK8Datatypes
+ compile libs.jline
+ compile libs.slf4jApi
+ compile project(':clients')
+ compile project(':core')
+ compile project(':log4j-appender')
+ compile project(':metadata')
+ compile project(':raft')
+
+ compile libs.jacksonJaxrsJsonProvider
+
+ testCompile project(':clients')
+ testCompile libs.junitJupiter
+
+ testRuntime libs.slf4jlog4j
+ }
+
+ javadoc {
+ enabled = false
+ }
+
+ tasks.create(name: "copyDependantLibs", type: Copy) {
+ from (configurations.testRuntime) {
+ include('jline-*jar')
+ }
+ from (configurations.runtime) {
+ include('jline-*jar')
+ }
+ into "$buildDir/dependant-libs-${versions.scala}"
+ duplicatesStrategy 'exclude'
+ }
+
+ jar {
+ dependsOn 'copyDependantLibs'
+ }
+}
+
project(':streams') {
archivesBaseName = "kafka-streams"
ext.buildStreamsVersionFileName = "kafka-streams-version.properties"
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index aad58b0ddd8..63ed7ab238b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -269,6 +269,23 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 76690bbc829..1cfc630c0ea 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -253,6 +253,10 @@
+
+
+
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 9340e3c3103..30193bc19f6 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -71,6 +71,7 @@ versions += [
jacoco: "0.8.5",
jetty: "9.4.33.v20201020",
jersey: "2.31",
+ jline: "3.12.1",
jmh: "1.27",
hamcrest: "2.2",
log4j: "1.2.17",
@@ -149,6 +150,7 @@ libs += [
jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
jerseyHk2: "org.glassfish.jersey.inject:jersey-hk2:$versions.jersey",
+ jline: "org.jline:jline:$versions.jline",
jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh",
jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
new file mode 100644
index 00000000000..ef85314e0ef
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.
+ */
+public final class LocalLogManager implements MetaLogManager, AutoCloseable {
+ interface LocalBatch {
+ int size();
+ }
+
+ static class LeaderChangeBatch implements LocalBatch {
+ private final MetaLogLeader newLeader;
+
+ LeaderChangeBatch(MetaLogLeader newLeader) {
+ this.newLeader = newLeader;
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LeaderChangeBatch)) return false;
+ LeaderChangeBatch other = (LeaderChangeBatch) o;
+ if (!other.newLeader.equals(newLeader)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(newLeader);
+ }
+
+ @Override
+ public String toString() {
+ return "LeaderChangeBatch(newLeader=" + newLeader + ")";
+ }
+ }
+
+ static class LocalRecordBatch implements LocalBatch {
+ private final List records;
+
+ LocalRecordBatch(List records) {
+ this.records = records;
+ }
+
+ @Override
+ public int size() {
+ return records.size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LocalRecordBatch)) return false;
+ LocalRecordBatch other = (LocalRecordBatch) o;
+ if (!other.records.equals(records)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(records);
+ }
+
+ @Override
+ public String toString() {
+ return "LocalRecordBatch(records=" + records + ")";
+ }
+ }
+
+ public static class SharedLogData {
+ private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
+ private final HashMap logManagers = new HashMap<>();
+ private final TreeMap batches = new TreeMap<>();
+ private MetaLogLeader leader = new MetaLogLeader(-1, -1);
+ private long prevOffset = -1;
+
+ synchronized void registerLogManager(LocalLogManager logManager) {
+ if (logManagers.put(logManager.nodeId(), logManager) != null) {
+ throw new RuntimeException("Can't have multiple LocalLogManagers " +
+ "with id " + logManager.nodeId());
+ }
+ electLeaderIfNeeded();
+ }
+
+ synchronized void unregisterLogManager(LocalLogManager logManager) {
+ if (!logManagers.remove(logManager.nodeId(), logManager)) {
+ throw new RuntimeException("Log manager " + logManager.nodeId() +
+ " was not found.");
+ }
+ }
+
+ synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
+ if (epoch != leader.epoch()) {
+ log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
+ "match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
+ return Long.MAX_VALUE;
+ }
+ if (nodeId != leader.nodeId()) {
+ log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
+ "match the current leader id of {}.", nodeId, leader.nodeId());
+ return Long.MAX_VALUE;
+ }
+ log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
+ long offset = append(batch);
+ electLeaderIfNeeded();
+ return offset;
+ }
+
+ synchronized long append(LocalBatch batch) {
+ prevOffset += batch.size();
+ log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
+ batches.put(prevOffset, batch);
+ if (batch instanceof LeaderChangeBatch) {
+ LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch;
+ leader = leaderChangeBatch.newLeader;
+ }
+ for (LocalLogManager logManager : logManagers.values()) {
+ logManager.scheduleLogCheck();
+ }
+ return prevOffset;
+ }
+
+ synchronized void electLeaderIfNeeded() {
+ if (leader.nodeId() != -1 || logManagers.isEmpty()) {
+ return;
+ }
+ int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
+ Iterator iter = logManagers.keySet().iterator();
+ Integer nextLeaderNode = null;
+ for (int i = 0; i <= nextLeaderIndex; i++) {
+ nextLeaderNode = iter.next();
+ }
+ MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
+ log.info("Elected new leader: {}.", newLeader);
+ append(new LeaderChangeBatch(newLeader));
+ }
+
+ synchronized Entry nextBatch(long offset) {
+ Entry entry = batches.higherEntry(offset);
+ if (entry == null) {
+ return null;
+ }
+ return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private static class MetaLogListenerData {
+ private long offset = -1;
+ private final MetaLogListener listener;
+
+ MetaLogListenerData(MetaLogListener listener) {
+ this.listener = listener;
+ }
+ }
+
+ private final Logger log;
+
+ private final int nodeId;
+
+ private final SharedLogData shared;
+
+ private final EventQueue eventQueue;
+
+ private boolean initialized = false;
+
+ private boolean shutdown = false;
+
+ private long maxReadOffset = Long.MAX_VALUE;
+
+ private final List listeners = new ArrayList<>();
+
+ private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
+
+ public LocalLogManager(LogContext logContext,
+ int nodeId,
+ SharedLogData shared,
+ String threadNamePrefix) {
+ this.log = logContext.logger(LocalLogManager.class);
+ this.nodeId = nodeId;
+ this.shared = shared;
+ this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
+ shared.registerLogManager(this);
+ }
+
+ private void scheduleLogCheck() {
+ eventQueue.append(() -> {
+ try {
+ log.debug("Node {}: running log check.", nodeId);
+ int numEntriesFound = 0;
+ for (MetaLogListenerData listenerData : listeners) {
+ while (true) {
+ Entry entry = shared.nextBatch(listenerData.offset);
+ if (entry == null) {
+ log.trace("Node {}: reached the end of the log after finding " +
+ "{} entries.", nodeId, numEntriesFound);
+ break;
+ }
+ long entryOffset = entry.getKey();
+ if (entryOffset > maxReadOffset) {
+ log.trace("Node {}: after {} entries, not reading the next " +
+ "entry because its offset is {}, and maxReadOffset is {}.",
+ nodeId, numEntriesFound, entryOffset, maxReadOffset);
+ break;
+ }
+ if (entry.getValue() instanceof LeaderChangeBatch) {
+ LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
+ log.trace("Node {}: handling LeaderChange to {}.",
+ nodeId, batch.newLeader);
+ listenerData.listener.handleNewLeader(batch.newLeader);
+ if (batch.newLeader.epoch() > leader.epoch()) {
+ leader = batch.newLeader;
+ }
+ } else if (entry.getValue() instanceof LocalRecordBatch) {
+ LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
+ log.trace("Node {}: handling LocalRecordBatch with offset {}.",
+ nodeId, entryOffset);
+ listenerData.listener.handleCommits(entryOffset, batch.records);
+ }
+ numEntriesFound++;
+ listenerData.offset = entryOffset;
+ }
+ }
+ log.trace("Completed log check for node " + nodeId);
+ } catch (Exception e) {
+ log.error("Exception while handling log check", e);
+ }
+ });
+ }
+
+ public void beginShutdown() {
+ eventQueue.beginShutdown("beginShutdown", () -> {
+ try {
+ if (initialized && !shutdown) {
+ log.debug("Node {}: beginning shutdown.", nodeId);
+ renounce(leader.epoch());
+ for (MetaLogListenerData listenerData : listeners) {
+ listenerData.listener.beginShutdown();
+ }
+ shared.unregisterLogManager(this);
+ }
+ } catch (Exception e) {
+ log.error("Unexpected exception while sending beginShutdown callbacks", e);
+ }
+ shutdown = true;
+ });
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ log.debug("Node {}: closing.", nodeId);
+ beginShutdown();
+ eventQueue.close();
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ eventQueue.append(() -> {
+ log.debug("initialized local log manager for node " + nodeId);
+ initialized = true;
+ });
+ }
+
+ @Override
+ public void register(MetaLogListener listener) throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ eventQueue.append(() -> {
+ if (shutdown) {
+ log.info("Node {}: can't register because local log manager has " +
+ "already been shut down.", nodeId);
+ future.complete(null);
+ } else if (initialized) {
+ log.info("Node {}: registered MetaLogListener.", nodeId);
+ listeners.add(new MetaLogListenerData(listener));
+ shared.electLeaderIfNeeded();
+ scheduleLogCheck();
+ future.complete(null);
+ } else {
+ log.info("Node {}: can't register because local log manager has not " +
+ "been initialized.", nodeId);
+ future.completeExceptionally(new RuntimeException(
+ "LocalLogManager was not initialized."));
+ }
+ });
+ future.get();
+ }
+
+ @Override
+ public long scheduleWrite(long epoch, List batch) {
+ return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
+ batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+ }
+
+ @Override
+ public void renounce(long epoch) {
+ MetaLogLeader curLeader = leader;
+ MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
+ shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
+ }
+
+ @Override
+ public MetaLogLeader leader() {
+ return leader;
+ }
+
+ @Override
+ public int nodeId() {
+ return nodeId;
+ }
+
+ public List listeners() {
+ final CompletableFuture> future = new CompletableFuture<>();
+ eventQueue.append(() -> {
+ future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
+ });
+ try {
+ return future.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setMaxReadOffset(long maxReadOffset) {
+ CompletableFuture future = new CompletableFuture<>();
+ eventQueue.append(() -> {
+ log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset);
+ this.maxReadOffset = maxReadOffset;
+ scheduleLogCheck();
+ future.complete(null);
+ });
+ try {
+ future.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/metadata/src/main/resources/common/metadata/IsrChangeRecord.json b/metadata/src/main/resources/common/metadata/IsrChangeRecord.json
index e4583909095..fd8d8341787 100644
--- a/metadata/src/main/resources/common/metadata/IsrChangeRecord.json
+++ b/metadata/src/main/resources/common/metadata/IsrChangeRecord.json
@@ -28,6 +28,8 @@
{ "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
"about": "The lead replica, or -1 if there is no leader." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
- "about": "An epoch that gets incremented each time we change the ISR." }
+ "about": "An epoch that gets incremented each time we change the partition leader." },
+ { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
+ "about": "An epoch that gets incremented each time we change anything in the partition." }
]
}
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index 79c24c2f1ac..5cc7d1328c9 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -34,6 +34,8 @@
{ "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
"about": "The lead replica, or -1 if there is no leader." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
- "about": "An epoch that gets incremented each time we change the ISR." }
+ "about": "An epoch that gets incremented each time we change the partition leader." },
+ { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
+ "about": "An epoch that gets incremented each time we change anything in the partition." }
]
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
index 6d673b3afdd..41e968c4d5c 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
@@ -82,7 +82,7 @@ public class MetadataParserTest {
PartitionRecord partitionRecord = new PartitionRecord().
setReplicas(longReplicaList);
ObjectSerializationCache cache = new ObjectSerializationCache();
- assertEquals("Event size would be 33554478, but the maximum serialized event " +
+ assertEquals("Event size would be 33554482, but the maximum serialized event " +
"size is 33554432", assertThrows(RuntimeException.class, () -> {
MetadataParser.size(partitionRecord, (short) 0, cache);
}).getMessage());
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
index 9d4eb8b594e..9dd6262ff66 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -51,7 +51,7 @@ public class LocalLogManagerTest {
}
/**
- * Test that the local log maanger will claim leadership.
+ * Test that the local log manager will claim leadership.
*/
@Test
public void testClaimsLeadership() throws Exception {
diff --git a/settings.gradle b/settings.gradle
index 55f77f3bed3..fedfa9a650c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -29,6 +29,7 @@ include 'clients',
'log4j-appender',
'metadata',
'raft',
+ 'shell',
'streams',
'streams:examples',
'streams:streams-scala',
diff --git a/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
new file mode 100644
index 00000000000..3fc94279565
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.jline.reader.Candidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Implements the cat command.
+ */
+public final class CatCommandHandler implements Commands.Handler {
+ private static final Logger log = LoggerFactory.getLogger(CatCommandHandler.class);
+
+ public final static Commands.Type TYPE = new CatCommandType();
+
+ public static class CatCommandType implements Commands.Type {
+ private CatCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "cat";
+ }
+
+ @Override
+ public String description() {
+ return "Show the contents of metadata nodes.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return false;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ parser.addArgument("targets").
+ nargs("+").
+ help("The metadata nodes to display.");
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ return new CatCommandHandler(namespace.getList("targets"));
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
+ candidates);
+ }
+ }
+
+ private final List targets;
+
+ public CatCommandHandler(List targets) {
+ this.targets = targets;
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) throws Exception {
+ log.trace("cat " + targets);
+ for (String target : targets) {
+ manager.visit(new GlobVisitor(target, entryOption -> {
+ if (entryOption.isPresent()) {
+ MetadataNode node = entryOption.get().node();
+ if (node instanceof DirectoryNode) {
+ writer.println("cat: " + target + ": Is a directory");
+ } else if (node instanceof FileNode) {
+ FileNode fileNode = (FileNode) node;
+ writer.println(fileNode.contents());
+ }
+ } else {
+ writer.println("cat: " + target + ": No such file or directory.");
+ }
+ }));
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return targets.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof CatCommandHandler)) return false;
+ CatCommandHandler o = (CatCommandHandler) other;
+ if (!Objects.equals(o.targets, targets)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java
new file mode 100644
index 00000000000..8d270e54328
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Implements the cd command.
+ */
+public final class CdCommandHandler implements Commands.Handler {
+ public final static Commands.Type TYPE = new CdCommandType();
+
+ public static class CdCommandType implements Commands.Type {
+ private CdCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "cd";
+ }
+
+ @Override
+ public String description() {
+ return "Set the current working directory.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return true;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ parser.addArgument("target").
+ nargs("?").
+ help("The directory to change to.");
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ return new CdCommandHandler(Optional.ofNullable(namespace.getString("target")));
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ if (nextWords.size() == 1) {
+ CommandUtils.completePath(nodeManager, nextWords.get(0), candidates);
+ }
+ }
+ }
+
+ private final Optional target;
+
+ public CdCommandHandler(Optional target) {
+ this.target = target;
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) throws Exception {
+ String effectiveTarget = target.orElse("/");
+ manager.visit(new Consumer() {
+ @Override
+ public void accept(MetadataNodeManager.Data data) {
+ new GlobVisitor(effectiveTarget, entryOption -> {
+ if (entryOption.isPresent()) {
+ if (!(entryOption.get().node() instanceof DirectoryNode)) {
+ writer.println("cd: " + effectiveTarget + ": not a directory.");
+ } else {
+ data.setWorkingDirectory(entryOption.get().absolutePath());
+ }
+ } else {
+ writer.println("cd: " + effectiveTarget + ": no such directory.");
+ }
+ }).accept(data);
+ }
+ });
+ }
+
+ @Override
+ public int hashCode() {
+ return target.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof CdCommandHandler)) return false;
+ CdCommandHandler o = (CdCommandHandler) other;
+ if (!o.target.equals(target)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java b/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java
new file mode 100644
index 00000000000..0639172e95d
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.jline.reader.Candidate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+/**
+ * Utility functions for command handlers.
+ */
+public final class CommandUtils {
+ /**
+ * Convert a list of paths into the effective list of paths which should be used.
+ * Empty strings will be removed. If no paths are given, the current working
+ * directory will be used.
+ *
+ * @param paths The input paths. Non-null.
+ *
+ * @return The output paths.
+ */
+ public static List getEffectivePaths(List paths) {
+ List effectivePaths = new ArrayList<>();
+ for (String path : paths) {
+ if (!path.isEmpty()) {
+ effectivePaths.add(path);
+ }
+ }
+ if (effectivePaths.isEmpty()) {
+ effectivePaths.add(".");
+ }
+ return effectivePaths;
+ }
+
+ /**
+ * Generate a list of potential completions for a prefix of a command name.
+ *
+ * @param commandPrefix The command prefix. Non-null.
+ * @param candidates The list to add the output completions to.
+ */
+ public static void completeCommand(String commandPrefix, List candidates) {
+ String command = Commands.TYPES.ceilingKey(commandPrefix);
+ while (command != null && command.startsWith(commandPrefix)) {
+ candidates.add(new Candidate(command));
+ command = Commands.TYPES.higherKey(command);
+ }
+ }
+
+ /**
+ * Convert a path to a list of path components.
+ * Multiple slashes in a row are treated the same as a single slash.
+ * Trailing slashes are ignored.
+ */
+ public static List splitPath(String path) {
+ List results = new ArrayList<>();
+ String[] components = path.split("/");
+ for (int i = 0; i < components.length; i++) {
+ if (!components[i].isEmpty()) {
+ results.add(components[i]);
+ }
+ }
+ return results;
+ }
+
+ public static List stripDotPathComponents(List input) {
+ List output = new ArrayList<>();
+ for (String string : input) {
+ if (string.equals("..")) {
+ if (output.size() > 0) {
+ output.remove(output.size() - 1);
+ }
+ } else if (!string.equals(".")) {
+ output.add(string);
+ }
+ }
+ return output;
+ }
+
+ /**
+ * Generate a list of potential completions for a path.
+ *
+ * @param nodeManager The NodeManager.
+ * @param pathPrefix The path prefix. Non-null.
+ * @param candidates The list to add the output completions to.
+ */
+ public static void completePath(MetadataNodeManager nodeManager,
+ String pathPrefix,
+ List candidates) throws Exception {
+ nodeManager.visit(data -> {
+ String absolutePath = pathPrefix.startsWith("/") ?
+ pathPrefix : data.workingDirectory() + "/" + pathPrefix;
+ List pathComponents = stripDotPathComponents(splitPath(absolutePath));
+ DirectoryNode directory = data.root();
+ int numDirectories = pathPrefix.endsWith("/") ?
+ pathComponents.size() : pathComponents.size() - 1;
+ for (int i = 0; i < numDirectories; i++) {
+ MetadataNode node = directory.child(pathComponents.get(i));
+ if (node == null || !(node instanceof DirectoryNode)) {
+ return;
+ }
+ directory = (DirectoryNode) node;
+ }
+ String lastComponent = "";
+ if (numDirectories >= 0 && numDirectories < pathComponents.size()) {
+ lastComponent = pathComponents.get(numDirectories);
+ }
+ Entry candidate =
+ directory.children().ceilingEntry(lastComponent);
+ String effectivePrefix;
+ int lastSlash = pathPrefix.lastIndexOf('/');
+ if (lastSlash < 0) {
+ effectivePrefix = "";
+ } else {
+ effectivePrefix = pathPrefix.substring(0, lastSlash + 1);
+ }
+ while (candidate != null && candidate.getKey().startsWith(lastComponent)) {
+ StringBuilder candidateBuilder = new StringBuilder();
+ candidateBuilder.append(effectivePrefix).append(candidate.getKey());
+ boolean complete = true;
+ if (candidate.getValue() instanceof DirectoryNode) {
+ candidateBuilder.append("/");
+ complete = false;
+ }
+ candidates.add(new Candidate(candidateBuilder.toString(),
+ candidateBuilder.toString(), null, null, null, null, complete));
+ candidate = directory.children().higherEntry(candidate.getKey());
+ }
+ });
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/Commands.java b/shell/src/main/java/org/apache/kafka/shell/Commands.java
new file mode 100644
index 00000000000..db16411ebae
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/Commands.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import net.sourceforge.argparse4j.internal.HelpScreenException;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+
+/**
+ * The commands for the Kafka metadata tool.
+ */
+public final class Commands {
+ /**
+ * A map from command names to command types.
+ */
+ static final NavigableMap TYPES;
+
+ static {
+ TreeMap typesMap = new TreeMap<>();
+ for (Type type : Arrays.asList(
+ CatCommandHandler.TYPE,
+ CdCommandHandler.TYPE,
+ ExitCommandHandler.TYPE,
+ FindCommandHandler.TYPE,
+ HelpCommandHandler.TYPE,
+ HistoryCommandHandler.TYPE,
+ LsCommandHandler.TYPE,
+ ManCommandHandler.TYPE,
+ PwdCommandHandler.TYPE)) {
+ typesMap.put(type.name(), type);
+ }
+ TYPES = Collections.unmodifiableNavigableMap(typesMap);
+ }
+
+ /**
+ * Command handler objects are instantiated with specific arguments to
+ * execute commands.
+ */
+ public interface Handler {
+ void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) throws Exception;
+ }
+
+ /**
+ * An object which describes a type of command handler. This includes
+ * information like its name, help text, and whether it should be accessible
+ * from non-interactive mode.
+ */
+ public interface Type {
+ String name();
+ String description();
+ boolean shellOnly();
+ void addArguments(ArgumentParser parser);
+ Handler createHandler(Namespace namespace);
+ void completeNext(MetadataNodeManager nodeManager,
+ List nextWords,
+ List candidates) throws Exception;
+ }
+
+ private final ArgumentParser parser;
+
+ /**
+ * Create the commands instance.
+ *
+ * @param addShellCommands True if we should include the shell-only commands.
+ */
+ public Commands(boolean addShellCommands) {
+ this.parser = ArgumentParsers.newArgumentParser("", false);
+ Subparsers subparsers = this.parser.addSubparsers().dest("command");
+ for (Type type : TYPES.values()) {
+ if (addShellCommands || !type.shellOnly()) {
+ Subparser subParser = subparsers.addParser(type.name());
+ subParser.help(type.description());
+ type.addArguments(subParser);
+ }
+ }
+ }
+
+ ArgumentParser parser() {
+ return parser;
+ }
+
+ /**
+ * Handle the given command.
+ *
+ * In general this function should not throw exceptions. Instead, it should
+ * return ErroneousCommandHandler if the input was invalid.
+ *
+ * @param arguments The command line arguments.
+ * @return The command handler.
+ */
+ public Handler parseCommand(List arguments) {
+ List trimmedArguments = new ArrayList<>(arguments);
+ while (true) {
+ if (trimmedArguments.isEmpty()) {
+ return new NoOpCommandHandler();
+ }
+ String last = trimmedArguments.get(trimmedArguments.size() - 1);
+ if (!last.isEmpty()) {
+ break;
+ }
+ trimmedArguments.remove(trimmedArguments.size() - 1);
+ }
+ Namespace namespace;
+ try {
+ namespace = parser.parseArgs(trimmedArguments.toArray(new String[0]));
+ } catch (HelpScreenException e) {
+ return new NoOpCommandHandler();
+ } catch (ArgumentParserException e) {
+ return new ErroneousCommandHandler(e.getMessage());
+ }
+ String command = namespace.get("command");
+ if (!command.equals(trimmedArguments.get(0))) {
+ return new ErroneousCommandHandler("invalid choice: '" +
+ trimmedArguments.get(0) + "': did you mean '" + command + "'?");
+ }
+ Type type = TYPES.get(command);
+ if (type == null) {
+ return new ErroneousCommandHandler("Unknown command specified: " + command);
+ } else {
+ return type.createHandler(namespace);
+ }
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java
new file mode 100644
index 00000000000..d52c55f9630
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import java.io.PrintWriter;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles erroneous commands.
+ */
+public final class ErroneousCommandHandler implements Commands.Handler {
+ private final String message;
+
+ public ErroneousCommandHandler(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) {
+ writer.println(message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(message);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof ErroneousCommandHandler)) return false;
+ ErroneousCommandHandler o = (ErroneousCommandHandler) other;
+ if (!Objects.equals(o.message, message)) return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "ErroneousCommandHandler(" + message + ")";
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java
new file mode 100644
index 00000000000..2b11b352a8f
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Implements the exit command.
+ */
+public final class ExitCommandHandler implements Commands.Handler {
+ public final static Commands.Type TYPE = new ExitCommandType();
+
+ public static class ExitCommandType implements Commands.Type {
+ private ExitCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "exit";
+ }
+
+ @Override
+ public String description() {
+ return "Exit the metadata shell.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return true;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ // nothing to do
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ return new ExitCommandHandler();
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ // nothing to do
+ }
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) {
+ Exit.exit(0);
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof ExitCommandHandler)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java
new file mode 100644
index 00000000000..6d9ae44654b
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Implements the find command.
+ */
+public final class FindCommandHandler implements Commands.Handler {
+ public final static Commands.Type TYPE = new FindCommandType();
+
+ public static class FindCommandType implements Commands.Type {
+ private FindCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "find";
+ }
+
+ @Override
+ public String description() {
+ return "Search for nodes in the directory hierarchy.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return false;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ parser.addArgument("paths").
+ nargs("*").
+ help("The paths to start at.");
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ return new FindCommandHandler(namespace.getList("paths"));
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
+ candidates);
+ }
+ }
+
+ private final List paths;
+
+ public FindCommandHandler(List paths) {
+ this.paths = paths;
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) throws Exception {
+ for (String path : CommandUtils.getEffectivePaths(paths)) {
+ manager.visit(new GlobVisitor(path, entryOption -> {
+ if (entryOption.isPresent()) {
+ find(writer, path, entryOption.get().node());
+ } else {
+ writer.println("find: " + path + ": no such file or directory.");
+ }
+ }));
+ }
+ }
+
+ private void find(PrintWriter writer, String path, MetadataNode node) {
+ writer.println(path);
+ if (node instanceof DirectoryNode) {
+ DirectoryNode directory = (DirectoryNode) node;
+ for (Entry entry : directory.children().entrySet()) {
+ String nextPath = path.equals("/") ?
+ path + entry.getKey() : path + "/" + entry.getKey();
+ find(writer, nextPath, entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(paths);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof FindCommandHandler)) return false;
+ FindCommandHandler o = (FindCommandHandler) other;
+ if (!Objects.equals(o.paths, paths)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java b/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java
new file mode 100644
index 00000000000..b93382b258e
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.regex.Pattern;
+
+/**
+ * Implements a per-path-component glob.
+ */
+public final class GlobComponent {
+ private static final Logger log = LoggerFactory.getLogger(GlobComponent.class);
+
+ /**
+ * Returns true if the character is a special character for regular expressions.
+ */
+ private static boolean isRegularExpressionSpecialCharacter(char ch) {
+ switch (ch) {
+ case '$':
+ case '(':
+ case ')':
+ case '+':
+ case '.':
+ case '[':
+ case ']':
+ case '^':
+ case '{':
+ case '|':
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ /**
+ * Returns true if the character is a special character for globs.
+ */
+ private static boolean isGlobSpecialCharacter(char ch) {
+ switch (ch) {
+ case '*':
+ case '?':
+ case '\\':
+ case '{':
+ case '}':
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ /**
+ * Converts a glob string to a regular expression string.
+ * Returns null if the glob should be handled as a literal (can only match one string).
+ * Throws an exception if the glob is malformed.
+ */
+ static String toRegularExpression(String glob) {
+ StringBuilder output = new StringBuilder("^");
+ boolean literal = true;
+ boolean processingGroup = false;
+
+ for (int i = 0; i < glob.length(); ) {
+ char c = glob.charAt(i++);
+ switch (c) {
+ case '?':
+ literal = false;
+ output.append(".");
+ break;
+ case '*':
+ literal = false;
+ output.append(".*");
+ break;
+ case '\\':
+ if (i == glob.length()) {
+ output.append(c);
+ } else {
+ char next = glob.charAt(i);
+ i++;
+ if (isGlobSpecialCharacter(next) ||
+ isRegularExpressionSpecialCharacter(next)) {
+ output.append('\\');
+ }
+ output.append(next);
+ }
+ break;
+ case '{':
+ if (processingGroup) {
+ throw new RuntimeException("Can't nest glob groups.");
+ }
+ literal = false;
+ output.append("(?:(?:");
+ processingGroup = true;
+ break;
+ case ',':
+ if (processingGroup) {
+ literal = false;
+ output.append(")|(?:");
+ } else {
+ output.append(c);
+ }
+ break;
+ case '}':
+ if (processingGroup) {
+ literal = false;
+ output.append("))");
+ processingGroup = false;
+ } else {
+ output.append(c);
+ }
+ break;
+ // TODO: handle character ranges
+ default:
+ if (isRegularExpressionSpecialCharacter(c)) {
+ output.append('\\');
+ }
+ output.append(c);
+ }
+ }
+ if (processingGroup) {
+ throw new RuntimeException("Unterminated glob group.");
+ }
+ if (literal) {
+ return null;
+ }
+ output.append('$');
+ return output.toString();
+ }
+
+ private final String component;
+ private final Pattern pattern;
+
+ public GlobComponent(String component) {
+ this.component = component;
+ Pattern newPattern = null;
+ try {
+ String regularExpression = toRegularExpression(component);
+ if (regularExpression != null) {
+ newPattern = Pattern.compile(regularExpression);
+ }
+ } catch (RuntimeException e) {
+ log.debug("Invalid glob pattern: " + e.getMessage());
+ }
+ this.pattern = newPattern;
+ }
+
+ public String component() {
+ return component;
+ }
+
+ public boolean literal() {
+ return pattern == null;
+ }
+
+ public boolean matches(String nodeName) {
+ if (pattern == null) {
+ return component.equals(nodeName);
+ } else {
+ return pattern.matcher(nodeName).matches();
+ }
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java b/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java
new file mode 100644
index 00000000000..8081b7e4450
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Visits metadata paths based on a glob string.
+ */
+public final class GlobVisitor implements Consumer {
+ private final String glob;
+ private final Consumer> handler;
+
+ public GlobVisitor(String glob,
+ Consumer> handler) {
+ this.glob = glob;
+ this.handler = handler;
+ }
+
+ public static class MetadataNodeInfo {
+ private final String[] path;
+ private final MetadataNode node;
+
+ MetadataNodeInfo(String[] path, MetadataNode node) {
+ this.path = path;
+ this.node = node;
+ }
+
+ public String[] path() {
+ return path;
+ }
+
+ public MetadataNode node() {
+ return node;
+ }
+
+ public String lastPathComponent() {
+ if (path.length == 0) {
+ return "/";
+ } else {
+ return path[path.length - 1];
+ }
+ }
+
+ public String absolutePath() {
+ return "/" + String.join("/", path);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path, node);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof MetadataNodeInfo)) return false;
+ MetadataNodeInfo other = (MetadataNodeInfo) o;
+ if (!Arrays.equals(path, other.path)) return false;
+ if (!node.equals(other.node)) return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder("MetadataNodeInfo(path=");
+ for (int i = 0; i < path.length; i++) {
+ bld.append("/");
+ bld.append(path[i]);
+ }
+ bld.append(", node=").append(node).append(")");
+ return bld.toString();
+ }
+ }
+
+ @Override
+ public void accept(MetadataNodeManager.Data data) {
+ String fullGlob = glob.startsWith("/") ? glob :
+ data.workingDirectory() + "/" + glob;
+ List globComponents =
+ CommandUtils.stripDotPathComponents(CommandUtils.splitPath(fullGlob));
+ if (!accept(globComponents, 0, data.root(), new String[0])) {
+ handler.accept(Optional.empty());
+ }
+ }
+
+ private boolean accept(List globComponents,
+ int componentIndex,
+ MetadataNode node,
+ String[] path) {
+ if (componentIndex >= globComponents.size()) {
+ handler.accept(Optional.of(new MetadataNodeInfo(path, node)));
+ return true;
+ }
+ String globComponentString = globComponents.get(componentIndex);
+ GlobComponent globComponent = new GlobComponent(globComponentString);
+ if (globComponent.literal()) {
+ if (!(node instanceof MetadataNode.DirectoryNode)) {
+ return false;
+ }
+ MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node;
+ MetadataNode child = directory.child(globComponent.component());
+ if (child == null) {
+ return false;
+ }
+ String[] newPath = new String[path.length + 1];
+ System.arraycopy(path, 0, newPath, 0, path.length);
+ newPath[path.length] = globComponent.component();
+ return accept(globComponents, componentIndex + 1, child, newPath);
+ }
+ if (!(node instanceof MetadataNode.DirectoryNode)) {
+ return false;
+ }
+ MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node;
+ boolean matchedAny = false;
+ for (Entry entry : directory.children().entrySet()) {
+ String nodeName = entry.getKey();
+ if (globComponent.matches(nodeName)) {
+ String[] newPath = new String[path.length + 1];
+ System.arraycopy(path, 0, newPath, 0, path.length);
+ newPath[path.length] = nodeName;
+ if (accept(globComponents, componentIndex + 1, entry.getValue(), newPath)) {
+ matchedAny = true;
+ }
+ }
+ }
+ return matchedAny;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java
new file mode 100644
index 00000000000..829274eefcc
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Implements the help command.
+ */
+public final class HelpCommandHandler implements Commands.Handler {
+ public final static Commands.Type TYPE = new HelpCommandType();
+
+ public static class HelpCommandType implements Commands.Type {
+ private HelpCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "help";
+ }
+
+ @Override
+ public String description() {
+ return "Display this help message.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return true;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ // nothing to do
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ return new HelpCommandHandler();
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ // nothing to do
+ }
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) {
+ writer.printf("Welcome to the Apache Kafka metadata shell.%n%n");
+ new Commands(true).parser().printHelp(writer);
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof HelpCommandHandler)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java
new file mode 100644
index 00000000000..edf9def4c87
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Implements the history command.
+ */
+public final class HistoryCommandHandler implements Commands.Handler {
+ public final static Commands.Type TYPE = new HistoryCommandType();
+
+ public static class HistoryCommandType implements Commands.Type {
+ private HistoryCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "history";
+ }
+
+ @Override
+ public String description() {
+ return "Print command history.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return true;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ parser.addArgument("numEntriesToShow").
+ nargs("?").
+ type(Integer.class).
+ help("The number of entries to show.");
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ Integer numEntriesToShow = namespace.getInt("numEntriesToShow");
+ return new HistoryCommandHandler(numEntriesToShow == null ?
+ Integer.MAX_VALUE : numEntriesToShow);
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ // nothing to do
+ }
+ }
+
+ private final int numEntriesToShow;
+
+ public HistoryCommandHandler(int numEntriesToShow) {
+ this.numEntriesToShow = numEntriesToShow;
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) throws Exception {
+ if (!shell.isPresent()) {
+ throw new RuntimeException("The history command requires a shell.");
+ }
+ Iterator> iter = shell.get().history(numEntriesToShow);
+ while (iter.hasNext()) {
+ Map.Entry entry = iter.next();
+ writer.printf("% 5d %s%n", entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return numEntriesToShow;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof HistoryCommandHandler)) return false;
+ HistoryCommandHandler o = (HistoryCommandHandler) other;
+ return o.numEntriesToShow == numEntriesToShow;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java b/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
new file mode 100644
index 00000000000..aa4d4ea56cf
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.jline.reader.Candidate;
+import org.jline.reader.Completer;
+import org.jline.reader.EndOfFileException;
+import org.jline.reader.History;
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.ParsedLine;
+import org.jline.reader.Parser;
+import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.DefaultParser;
+import org.jline.reader.impl.history.DefaultHistory;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.TerminalBuilder;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * The Kafka metadata shell.
+ */
+public final class InteractiveShell implements AutoCloseable {
+ static class MetadataShellCompleter implements Completer {
+ private final MetadataNodeManager nodeManager;
+
+ MetadataShellCompleter(MetadataNodeManager nodeManager) {
+ this.nodeManager = nodeManager;
+ }
+
+ @Override
+ public void complete(LineReader reader, ParsedLine line, List candidates) {
+ if (line.words().size() == 0) {
+ CommandUtils.completeCommand("", candidates);
+ } else if (line.words().size() == 1) {
+ CommandUtils.completeCommand(line.words().get(0), candidates);
+ } else {
+ Iterator iter = line.words().iterator();
+ String command = iter.next();
+ List nextWords = new ArrayList<>();
+ while (iter.hasNext()) {
+ nextWords.add(iter.next());
+ }
+ Commands.Type type = Commands.TYPES.get(command);
+ if (type == null) {
+ return;
+ }
+ try {
+ type.completeNext(nodeManager, nextWords, candidates);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private final MetadataNodeManager nodeManager;
+ private final Terminal terminal;
+ private final Parser parser;
+ private final History history;
+ private final MetadataShellCompleter completer;
+ private final LineReader reader;
+
+ public InteractiveShell(MetadataNodeManager nodeManager) throws IOException {
+ this.nodeManager = nodeManager;
+ TerminalBuilder builder = TerminalBuilder.builder().
+ system(true).
+ nativeSignals(true);
+ this.terminal = builder.build();
+ this.parser = new DefaultParser();
+ this.history = new DefaultHistory();
+ this.completer = new MetadataShellCompleter(nodeManager);
+ this.reader = LineReaderBuilder.builder().
+ terminal(terminal).
+ parser(parser).
+ history(history).
+ completer(completer).
+ option(LineReader.Option.AUTO_FRESH_LINE, false).
+ build();
+ }
+
+ public void runMainLoop() throws Exception {
+ terminal.writer().println("[ Kafka Metadata Shell ]");
+ terminal.flush();
+ Commands commands = new Commands(true);
+ while (true) {
+ try {
+ reader.readLine(">> ");
+ ParsedLine parsedLine = reader.getParsedLine();
+ Commands.Handler handler = commands.parseCommand(parsedLine.words());
+ handler.run(Optional.of(this), terminal.writer(), nodeManager);
+ terminal.writer().flush();
+ } catch (UserInterruptException eof) {
+ // Handle the user pressing control-C.
+ terminal.writer().println("^C");
+ } catch (EndOfFileException eof) {
+ return;
+ }
+ }
+ }
+
+ public int screenWidth() {
+ return terminal.getWidth();
+ }
+
+ public Iterator> history(int numEntriesToShow) {
+ if (numEntriesToShow < 0) {
+ numEntriesToShow = 0;
+ }
+ int last = history.last();
+ if (numEntriesToShow > last + 1) {
+ numEntriesToShow = last + 1;
+ }
+ int first = last - numEntriesToShow + 1;
+ if (first < history.first()) {
+ first = history.first();
+ }
+ return new HistoryIterator(first, last);
+ }
+
+ public class HistoryIterator implements Iterator> {
+ private int index;
+ private int last;
+
+ HistoryIterator(int index, int last) {
+ this.index = index;
+ this.last = last;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index <= last;
+ }
+
+ @Override
+ public Entry next() {
+ if (index > last) {
+ throw new NoSuchElementException();
+ }
+ int p = index++;
+ return new AbstractMap.SimpleImmutableEntry<>(p, history.get(p));
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ terminal.close();
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java
new file mode 100644
index 00000000000..6260d122bfe
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.jline.reader.Candidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * Implements the ls command.
+ */
+public final class LsCommandHandler implements Commands.Handler {
+ private static final Logger log = LoggerFactory.getLogger(LsCommandHandler.class);
+
+ public final static Commands.Type TYPE = new LsCommandType();
+
+ public static class LsCommandType implements Commands.Type {
+ private LsCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "ls";
+ }
+
+ @Override
+ public String description() {
+ return "List metadata nodes.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return false;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ parser.addArgument("targets").
+ nargs("*").
+ help("The metadata node paths to list.");
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ return new LsCommandHandler(namespace.getList("targets"));
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
+ candidates);
+ }
+ }
+
+ private final List targets;
+
+ public LsCommandHandler(List targets) {
+ this.targets = targets;
+ }
+
+ static class TargetDirectory {
+ private final String name;
+ private final List children;
+
+ TargetDirectory(String name, List children) {
+ this.name = name;
+ this.children = children;
+ }
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) throws Exception {
+ List targetFiles = new ArrayList<>();
+ List targetDirectories = new ArrayList<>();
+ for (String target : CommandUtils.getEffectivePaths(targets)) {
+ manager.visit(new GlobVisitor(target, entryOption -> {
+ if (entryOption.isPresent()) {
+ MetadataNodeInfo info = entryOption.get();
+ MetadataNode node = info.node();
+ if (node instanceof DirectoryNode) {
+ DirectoryNode directory = (DirectoryNode) node;
+ List children = new ArrayList<>();
+ children.addAll(directory.children().keySet());
+ targetDirectories.add(
+ new TargetDirectory(info.lastPathComponent(), children));
+ } else if (node instanceof FileNode) {
+ targetFiles.add(info.lastPathComponent());
+ }
+ } else {
+ writer.println("ls: " + target + ": no such file or directory.");
+ }
+ }));
+ }
+ OptionalInt screenWidth = shell.isPresent() ?
+ OptionalInt.of(shell.get().screenWidth()) : OptionalInt.empty();
+ log.trace("LS : targetFiles = {}, targetDirectories = {}, screenWidth = {}",
+ targetFiles, targetDirectories, screenWidth);
+ printTargets(writer, screenWidth, targetFiles, targetDirectories);
+ }
+
+ static void printTargets(PrintWriter writer,
+ OptionalInt screenWidth,
+ List targetFiles,
+ List targetDirectories) {
+ printEntries(writer, "", screenWidth, targetFiles);
+ boolean needIntro = targetFiles.size() > 0 || targetDirectories.size() > 1;
+ boolean firstIntro = targetFiles.isEmpty();
+ for (TargetDirectory targetDirectory : targetDirectories) {
+ String intro = "";
+ if (needIntro) {
+ if (!firstIntro) {
+ intro = intro + String.format("%n");
+ }
+ intro = intro + targetDirectory.name + ":";
+ firstIntro = false;
+ }
+ log.trace("LS : targetDirectory name = {}, children = {}",
+ targetDirectory.name, targetDirectory.children);
+ printEntries(writer, intro, screenWidth, targetDirectory.children);
+ }
+ }
+
+ static void printEntries(PrintWriter writer,
+ String intro,
+ OptionalInt screenWidth,
+ List entries) {
+ if (entries.isEmpty()) {
+ return;
+ }
+ if (!intro.isEmpty()) {
+ writer.println(intro);
+ }
+ ColumnSchema columnSchema = calculateColumnSchema(screenWidth, entries);
+ int numColumns = columnSchema.numColumns();
+ int numLines = (entries.size() + numColumns - 1) / numColumns;
+ for (int line = 0; line < numLines; line++) {
+ StringBuilder output = new StringBuilder();
+ for (int column = 0; column < numColumns; column++) {
+ int entryIndex = line + (column * columnSchema.entriesPerColumn());
+ if (entryIndex < entries.size()) {
+ String entry = entries.get(entryIndex);
+ output.append(entry);
+ if (column < numColumns - 1) {
+ int width = columnSchema.columnWidth(column);
+ for (int i = 0; i < width - entry.length(); i++) {
+ output.append(" ");
+ }
+ }
+ }
+ }
+ writer.println(output.toString());
+ }
+ }
+
+ static ColumnSchema calculateColumnSchema(OptionalInt screenWidth,
+ List entries) {
+ if (!screenWidth.isPresent()) {
+ return new ColumnSchema(1, entries.size());
+ }
+ int maxColumns = screenWidth.getAsInt() / 4;
+ if (maxColumns <= 1) {
+ return new ColumnSchema(1, entries.size());
+ }
+ ColumnSchema[] schemas = new ColumnSchema[maxColumns];
+ for (int numColumns = 1; numColumns <= maxColumns; numColumns++) {
+ schemas[numColumns - 1] = new ColumnSchema(numColumns,
+ (entries.size() + numColumns - 1) / numColumns);
+ }
+ for (int i = 0; i < entries.size(); i++) {
+ String entry = entries.get(i);
+ for (int s = 0; s < schemas.length; s++) {
+ ColumnSchema schema = schemas[s];
+ schema.process(i, entry);
+ }
+ }
+ for (int s = schemas.length - 1; s > 0; s--) {
+ ColumnSchema schema = schemas[s];
+ if (schema.columnWidths[schema.columnWidths.length - 1] != 0 &&
+ schema.totalWidth() <= screenWidth.getAsInt()) {
+ return schema;
+ }
+ }
+ return schemas[0];
+ }
+
+ static class ColumnSchema {
+ private final int[] columnWidths;
+ private final int entriesPerColumn;
+
+ ColumnSchema(int numColumns, int entriesPerColumn) {
+ this.columnWidths = new int[numColumns];
+ this.entriesPerColumn = entriesPerColumn;
+ }
+
+ ColumnSchema setColumnWidths(Integer... widths) {
+ for (int i = 0; i < widths.length; i++) {
+ columnWidths[i] = widths[i];
+ }
+ return this;
+ }
+
+ void process(int entryIndex, String output) {
+ int columnIndex = entryIndex / entriesPerColumn;
+ columnWidths[columnIndex] = Math.max(
+ columnWidths[columnIndex], output.length() + 2);
+ }
+
+ int totalWidth() {
+ int total = 0;
+ for (int i = 0; i < columnWidths.length; i++) {
+ total += columnWidths[i];
+ }
+ return total;
+ }
+
+ int numColumns() {
+ return columnWidths.length;
+ }
+
+ int columnWidth(int columnIndex) {
+ return columnWidths[columnIndex];
+ }
+
+ int entriesPerColumn() {
+ return entriesPerColumn;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columnWidths, entriesPerColumn);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ColumnSchema)) return false;
+ ColumnSchema other = (ColumnSchema) o;
+ if (entriesPerColumn != other.entriesPerColumn) return false;
+ if (!Arrays.equals(columnWidths, other.columnWidths)) return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder("ColumnSchema(columnWidths=[");
+ String prefix = "";
+ for (int i = 0; i < columnWidths.length; i++) {
+ bld.append(prefix);
+ bld.append(columnWidths[i]);
+ prefix = ", ";
+ }
+ bld.append("], entriesPerColumn=").append(entriesPerColumn).append(")");
+ return bld.toString();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(targets);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof LsCommandHandler)) return false;
+ LsCommandHandler o = (LsCommandHandler) other;
+ if (!Objects.equals(o.targets, targets)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java
new file mode 100644
index 00000000000..dcd0b8cd716
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Implements the manual command.
+ */
+public final class ManCommandHandler implements Commands.Handler {
+ private final String cmd;
+
+ public final static Commands.Type TYPE = new ManCommandType();
+
+ public static class ManCommandType implements Commands.Type {
+ private ManCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "man";
+ }
+
+ @Override
+ public String description() {
+ return "Show the help text for a specific command.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return true;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ parser.addArgument("cmd").
+ nargs(1).
+ help("The command to get help text for.");
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ return new ManCommandHandler(namespace.getList("cmd").get(0));
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ if (nextWords.size() == 1) {
+ CommandUtils.completeCommand(nextWords.get(0), candidates);
+ }
+ }
+ }
+
+ public ManCommandHandler(String cmd) {
+ this.cmd = cmd;
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) {
+ Commands.Type type = Commands.TYPES.get(cmd);
+ if (type == null) {
+ writer.println("man: unknown command " + cmd +
+ ". Type help to get a list of commands.");
+ } else {
+ ArgumentParser parser = ArgumentParsers.newArgumentParser(type.name(), false);
+ type.addArguments(parser);
+ writer.printf("%s: %s%n%n", cmd, type.description());
+ parser.printHelp(writer);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return cmd.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof ManCommandHandler)) return false;
+ ManCommandHandler o = (ManCommandHandler) other;
+ if (!o.cmd.equals(cmd)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java
new file mode 100644
index 00000000000..3764a17b8b0
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * A node in the metadata tool.
+ */
+public interface MetadataNode {
+ class DirectoryNode implements MetadataNode {
+ private final TreeMap children = new TreeMap<>();
+
+ public DirectoryNode mkdirs(String... names) {
+ if (names.length == 0) {
+ throw new RuntimeException("Invalid zero-length path");
+ }
+ DirectoryNode node = this;
+ for (int i = 0; i < names.length; i++) {
+ MetadataNode nextNode = node.children.get(names[i]);
+ if (nextNode == null) {
+ nextNode = new DirectoryNode();
+ node.children.put(names[i], nextNode);
+ } else {
+ if (!(nextNode instanceof DirectoryNode)) {
+ throw new NotDirectoryException();
+ }
+ }
+ node = (DirectoryNode) nextNode;
+ }
+ return node;
+ }
+
+ public void rmrf(String... names) {
+ if (names.length == 0) {
+ throw new RuntimeException("Invalid zero-length path");
+ }
+ DirectoryNode node = this;
+ for (int i = 0; i < names.length - 1; i++) {
+ MetadataNode nextNode = node.children.get(names[i]);
+ if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
+ throw new RuntimeException("Unable to locate directory /" +
+ String.join("/", names));
+ }
+ node = (DirectoryNode) nextNode;
+ }
+ node.children.remove(names[names.length - 1]);
+ }
+
+ public FileNode create(String name) {
+ MetadataNode node = children.get(name);
+ if (node == null) {
+ node = new FileNode();
+ children.put(name, node);
+ } else {
+ if (!(node instanceof FileNode)) {
+ throw new NotFileException();
+ }
+ }
+ return (FileNode) node;
+ }
+
+ public MetadataNode child(String component) {
+ return children.get(component);
+ }
+
+ public NavigableMap children() {
+ return children;
+ }
+
+ public void addChild(String name, DirectoryNode child) {
+ children.put(name, child);
+ }
+
+ public DirectoryNode directory(String... names) {
+ if (names.length == 0) {
+ throw new RuntimeException("Invalid zero-length path");
+ }
+ DirectoryNode node = this;
+ for (int i = 0; i < names.length; i++) {
+ MetadataNode nextNode = node.children.get(names[i]);
+ if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
+ throw new RuntimeException("Unable to locate directory /" +
+ String.join("/", names));
+ }
+ node = (DirectoryNode) nextNode;
+ }
+ return node;
+ }
+
+ public FileNode file(String... names) {
+ if (names.length == 0) {
+ throw new RuntimeException("Invalid zero-length path");
+ }
+ DirectoryNode node = this;
+ for (int i = 0; i < names.length - 1; i++) {
+ MetadataNode nextNode = node.children.get(names[i]);
+ if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
+ throw new RuntimeException("Unable to locate file /" +
+ String.join("/", names));
+ }
+ node = (DirectoryNode) nextNode;
+ }
+ MetadataNode nextNode = node.child(names[names.length - 1]);
+ if (nextNode == null || !(nextNode instanceof FileNode)) {
+ throw new RuntimeException("Unable to locate file /" +
+ String.join("/", names));
+ }
+ return (FileNode) nextNode;
+ }
+ }
+
+ class FileNode implements MetadataNode {
+ private String contents;
+
+ void setContents(String contents) {
+ this.contents = contents;
+ }
+
+ String contents() {
+ return contents;
+ }
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
new file mode 100644
index 00000000000..7910285d484
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Maintains the in-memory metadata for the metadata tool.
+ */
+public final class MetadataNodeManager implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
+
+ public static class Data {
+ private final DirectoryNode root = new DirectoryNode();
+ private String workingDirectory = "/";
+
+ public DirectoryNode root() {
+ return root;
+ }
+
+ public String workingDirectory() {
+ return workingDirectory;
+ }
+
+ public void setWorkingDirectory(String workingDirectory) {
+ this.workingDirectory = workingDirectory;
+ }
+ }
+
+ class LogListener implements MetaLogListener, RaftClient.Listener {
+ @Override
+ public void handleCommit(BatchReader reader) {
+ try {
+ // TODO: handle lastOffset
+ while (reader.hasNext()) {
+ BatchReader.Batch batch = reader.next();
+ for (ApiMessageAndVersion messageAndVersion : batch.records()) {
+ handleMessage(messageAndVersion.message());
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Override
+ public void handleCommits(long lastOffset, List messages) {
+ appendEvent("handleCommits", () -> {
+ log.error("handleCommits " + messages + " at offset " + lastOffset);
+ DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+ dir.create("offset").setContents(String.valueOf(lastOffset));
+ for (ApiMessage message : messages) {
+ handleMessage(message);
+ }
+ }, null);
+ }
+
+ @Override
+ public void handleNewLeader(MetaLogLeader leader) {
+ appendEvent("handleNewLeader", () -> {
+ log.error("handleNewLeader " + leader);
+ DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+ dir.create("leader").setContents(leader.toString());
+ }, null);
+ }
+
+ @Override
+ public void handleClaim(int epoch) {
+ // This shouldn't happen because we should never be the leader.
+ log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
+ }
+
+ @Override
+ public void handleRenounce(long epoch) {
+ // This shouldn't happen because we should never be the leader.
+ log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
+ }
+
+ @Override
+ public void beginShutdown() {
+ log.debug("MetaLogListener sent beginShutdown");
+ }
+ }
+
+ private final Data data = new Data();
+ private final LogListener logListener = new LogListener();
+ private final ObjectMapper objectMapper;
+ private final KafkaEventQueue queue;
+
+ public MetadataNodeManager() {
+ this.objectMapper = new ObjectMapper();
+ this.objectMapper.registerModule(new Jdk8Module());
+ this.queue = new KafkaEventQueue(Time.SYSTEM,
+ new LogContext("[node-manager-event-queue] "), "");
+ }
+
+ public void setup() throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ appendEvent("createShellNodes", () -> {
+ DirectoryNode directory = data.root().mkdirs("local");
+ directory.create("version").setContents(AppInfoParser.getVersion());
+ directory.create("commitId").setContents(AppInfoParser.getCommitId());
+ future.complete(null);
+ }, future);
+ future.get();
+ }
+
+ public LogListener logListener() {
+ return logListener;
+ }
+
+ @Override
+ public void close() throws Exception {
+ queue.close();
+ }
+
+ public void visit(Consumer consumer) throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ appendEvent("visit", () -> {
+ consumer.accept(data);
+ future.complete(null);
+ }, future);
+ future.get();
+ }
+
+ private void appendEvent(String name, Runnable runnable, CompletableFuture> future) {
+ queue.append(new EventQueue.Event() {
+ @Override
+ public void run() throws Exception {
+ runnable.run();
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Unexpected error while handling event " + name, e);
+ if (future != null) {
+ future.completeExceptionally(e);
+ }
+ }
+ });
+ }
+
+ private void handleMessage(ApiMessage message) {
+ try {
+ MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+ handleCommitImpl(type, message);
+ } catch (Exception e) {
+ log.error("Error processing record of type " + message.apiKey(), e);
+ }
+ }
+
+ private void handleCommitImpl(MetadataRecordType type, ApiMessage message)
+ throws Exception {
+ switch (type) {
+ case REGISTER_BROKER_RECORD: {
+ DirectoryNode brokersNode = data.root.mkdirs("brokers");
+ RegisterBrokerRecord record = (RegisterBrokerRecord) message;
+ DirectoryNode brokerNode = brokersNode.
+ mkdirs(Integer.toString(record.brokerId()));
+ FileNode registrationNode = brokerNode.create("registration");
+ registrationNode.setContents(record.toString());
+ brokerNode.create("isFenced").setContents("true");
+ break;
+ }
+ case UNREGISTER_BROKER_RECORD: {
+ UnregisterBrokerRecord record = (UnregisterBrokerRecord) message;
+ data.root.rmrf("brokers", Integer.toString(record.brokerId()));
+ break;
+ }
+ case TOPIC_RECORD: {
+ TopicRecord record = (TopicRecord) message;
+ DirectoryNode topicsDirectory = data.root.mkdirs("topics");
+ DirectoryNode topicDirectory = topicsDirectory.mkdirs(record.name());
+ topicDirectory.create("id").setContents(record.topicId().toString());
+ topicDirectory.create("name").setContents(record.name().toString());
+ DirectoryNode topicIdsDirectory = data.root.mkdirs("topicIds");
+ topicIdsDirectory.addChild(record.topicId().toString(), topicDirectory);
+ break;
+ }
+ case PARTITION_RECORD: {
+ PartitionRecord record = (PartitionRecord) message;
+ DirectoryNode topicDirectory =
+ data.root.mkdirs("topicIds").mkdirs(record.topicId().toString());
+ DirectoryNode partitionDirectory =
+ topicDirectory.mkdirs(Integer.toString(record.partitionId()));
+ JsonNode node = PartitionRecordJsonConverter.
+ write(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION);
+ partitionDirectory.create("data").setContents(node.toPrettyString());
+ break;
+ }
+ case CONFIG_RECORD: {
+ ConfigRecord record = (ConfigRecord) message;
+ String typeString = "";
+ switch (ConfigResource.Type.forId(record.resourceType())) {
+ case BROKER:
+ typeString = "broker";
+ break;
+ case TOPIC:
+ typeString = "topic";
+ break;
+ default:
+ throw new RuntimeException("Error processing CONFIG_RECORD: " +
+ "Can't handle ConfigResource.Type " + record.resourceType());
+ }
+ DirectoryNode configDirectory = data.root.mkdirs("configs").
+ mkdirs(typeString).mkdirs(record.resourceName());
+ if (record.value() == null) {
+ configDirectory.rmrf(record.name());
+ } else {
+ configDirectory.create(record.name()).setContents(record.value());
+ }
+ break;
+ }
+ case ISR_CHANGE_RECORD: {
+ IsrChangeRecord record = (IsrChangeRecord) message;
+ FileNode file = data.root.file("topicIds", record.topicId().toString(),
+ Integer.toString(record.partitionId()), "data");
+ JsonNode node = objectMapper.readTree(file.contents());
+ PartitionRecord partition = PartitionRecordJsonConverter.
+ read(node, PartitionRecord.HIGHEST_SUPPORTED_VERSION);
+ partition.setIsr(record.isr());
+ partition.setLeader(record.leader());
+ partition.setLeaderEpoch(record.leaderEpoch());
+ partition.setPartitionEpoch(record.partitionEpoch());
+ file.setContents(PartitionRecordJsonConverter.write(partition,
+ PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+ break;
+ }
+ case FENCE_BROKER_RECORD: {
+ FenceBrokerRecord record = (FenceBrokerRecord) message;
+ data.root.mkdirs("brokers", Integer.toString(record.id())).
+ create("isFenced").setContents("true");
+ break;
+ }
+ case UNFENCE_BROKER_RECORD: {
+ UnfenceBrokerRecord record = (UnfenceBrokerRecord) message;
+ data.root.mkdirs("brokers", Integer.toString(record.id())).
+ create("isFenced").setContents("false");
+ break;
+ }
+ case REMOVE_TOPIC_RECORD: {
+ RemoveTopicRecord record = (RemoveTopicRecord) message;
+ DirectoryNode topicsDirectory =
+ data.root.directory("topicIds", record.topicId().toString());
+ String name = topicsDirectory.file("name").contents();
+ data.root.rmrf("topics", name);
+ data.root.rmrf("topicIds", record.topicId().toString());
+ break;
+ }
+ default:
+ throw new RuntimeException("Unhandled metadata record type");
+ }
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
new file mode 100644
index 00000000000..b701310efb7
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.tools.TerseFailure;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * The Kafka metadata shell.
+ */
+public final class MetadataShell {
+ private static final Logger log = LoggerFactory.getLogger(MetadataShell.class);
+
+ public static class Builder {
+ private String snapshotPath;
+
+ public Builder setSnapshotPath(String snapshotPath) {
+ this.snapshotPath = snapshotPath;
+ return this;
+ }
+
+ public MetadataShell build() throws Exception {
+ MetadataNodeManager nodeManager = null;
+ SnapshotFileReader reader = null;
+ try {
+ nodeManager = new MetadataNodeManager();
+ reader = new SnapshotFileReader(snapshotPath, nodeManager.logListener());
+ return new MetadataShell(null, reader, nodeManager);
+ } catch (Throwable e) {
+ log.error("Initialization error", e);
+ if (reader != null) {
+ reader.close();
+ }
+ if (nodeManager != null) {
+ nodeManager.close();
+ }
+ throw e;
+ }
+ }
+ }
+
+ private final KafkaRaftManager raftManager;
+
+ private final SnapshotFileReader snapshotFileReader;
+
+ private final MetadataNodeManager nodeManager;
+
+ public MetadataShell(KafkaRaftManager raftManager,
+ SnapshotFileReader snapshotFileReader,
+ MetadataNodeManager nodeManager) {
+ this.raftManager = raftManager;
+ this.snapshotFileReader = snapshotFileReader;
+ this.nodeManager = nodeManager;
+ }
+
+ public void run(List args) throws Exception {
+ nodeManager.setup();
+ if (raftManager != null) {
+ raftManager.startup();
+ raftManager.register(nodeManager.logListener());
+ } else if (snapshotFileReader != null) {
+ snapshotFileReader.startup();
+ } else {
+ throw new RuntimeException("Expected either a raft manager or snapshot reader");
+ }
+ if (args == null || args.isEmpty()) {
+ // Interactive mode.
+ try (InteractiveShell shell = new InteractiveShell(nodeManager)) {
+ shell.runMainLoop();
+ }
+ } else {
+ // Non-interactive mode.
+ Commands commands = new Commands(false);
+ try (PrintWriter writer = new PrintWriter(new BufferedWriter(
+ new OutputStreamWriter(System.out, StandardCharsets.UTF_8)))) {
+ Commands.Handler handler = commands.parseCommand(args);
+ handler.run(Optional.empty(), writer, nodeManager);
+ writer.flush();
+ }
+ }
+ }
+
+ public void close() throws Exception {
+ if (raftManager != null) {
+ raftManager.shutdown();
+ }
+ if (snapshotFileReader != null) {
+ snapshotFileReader.close();
+ }
+ nodeManager.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+ ArgumentParser parser = ArgumentParsers
+ .newArgumentParser("metadata-tool")
+ .defaultHelp(true)
+ .description("The Apache Kafka metadata tool");
+ parser.addArgument("--snapshot", "-s")
+ .type(String.class)
+ .help("The snapshot file to read.");
+ parser.addArgument("command")
+ .nargs("*")
+ .help("The command to run.");
+ Namespace res = parser.parseArgsOrFail(args);
+ try {
+ Builder builder = new Builder();
+ builder.setSnapshotPath(res.getString("snapshot"));
+ Path tempDir = Files.createTempDirectory("MetadataShell");
+ Exit.addShutdownHook("agent-shutdown-hook", () -> {
+ log.debug("Removing temporary directory " + tempDir.toAbsolutePath().toString());
+ try {
+ Utils.delete(tempDir.toFile());
+ } catch (Exception e) {
+ log.error("Got exception while removing temporary directory " +
+ tempDir.toAbsolutePath().toString());
+ }
+ });
+ MetadataShell shell = builder.build();
+ shell.waitUntilCaughtUp();
+ try {
+ shell.run(res.getList("command"));
+ } finally {
+ shell.close();
+ }
+ Exit.exit(0);
+ } catch (TerseFailure e) {
+ System.err.println("Error: " + e.getMessage());
+ Exit.exit(1);
+ } catch (Throwable e) {
+ System.err.println("Unexpected error: " +
+ (e.getMessage() == null ? "" : e.getMessage()));
+ e.printStackTrace(System.err);
+ Exit.exit(1);
+ }
+ }
+
+ void waitUntilCaughtUp() throws ExecutionException, InterruptedException {
+ snapshotFileReader.caughtUpFuture().get();
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
new file mode 100644
index 00000000000..1756ba76aa8
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import java.io.PrintWriter;
+import java.util.Optional;
+
+/**
+ * Does nothing.
+ */
+public final class NoOpCommandHandler implements Commands.Handler {
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) {
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof NoOpCommandHandler)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java b/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java
new file mode 100644
index 00000000000..692534758e2
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+/**
+ * An exception that is thrown when a non-directory node is treated like a
+ * directory.
+ */
+public class NotDirectoryException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public NotDirectoryException() {
+ super();
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NotFileException.java b/shell/src/main/java/org/apache/kafka/shell/NotFileException.java
new file mode 100644
index 00000000000..cbc2a832d67
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/NotFileException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+/**
+ * An exception that is thrown when a non-file node is treated like a
+ * file.
+ */
+public class NotFileException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public NotFileException() {
+ super();
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java
new file mode 100644
index 00000000000..1e5b5da39ef
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Implements the pwd command.
+ */
+public final class PwdCommandHandler implements Commands.Handler {
+ public final static Commands.Type TYPE = new PwdCommandType();
+
+ public static class PwdCommandType implements Commands.Type {
+ private PwdCommandType() {
+ }
+
+ @Override
+ public String name() {
+ return "pwd";
+ }
+
+ @Override
+ public String description() {
+ return "Print the current working directory.";
+ }
+
+ @Override
+ public boolean shellOnly() {
+ return true;
+ }
+
+ @Override
+ public void addArguments(ArgumentParser parser) {
+ // nothing to do
+ }
+
+ @Override
+ public Commands.Handler createHandler(Namespace namespace) {
+ return new PwdCommandHandler();
+ }
+
+ @Override
+ public void completeNext(MetadataNodeManager nodeManager, List nextWords,
+ List candidates) throws Exception {
+ // nothing to do
+ }
+ }
+
+ @Override
+ public void run(Optional shell,
+ PrintWriter writer,
+ MetadataNodeManager manager) throws Exception {
+ manager.visit(data -> {
+ writer.println(data.workingDirectory());
+ });
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof PwdCommandHandler)) return false;
+ return true;
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
new file mode 100644
index 00000000000..e566be67d7d
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * Reads Kafka metadata snapshots.
+ */
+public final class SnapshotFileReader implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(SnapshotFileReader.class);
+
+ private final String snapshotPath;
+ private final MetaLogListener listener;
+ private final KafkaEventQueue queue;
+ private final CompletableFuture caughtUpFuture;
+ private FileRecords fileRecords;
+ private Iterator batchIterator;
+
+ public SnapshotFileReader(String snapshotPath, MetaLogListener listener) {
+ this.snapshotPath = snapshotPath;
+ this.listener = listener;
+ this.queue = new KafkaEventQueue(Time.SYSTEM,
+ new LogContext("[snapshotReaderQueue] "), "snapshotReaderQueue_");
+ this.caughtUpFuture = new CompletableFuture<>();
+ }
+
+ public void startup() throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ queue.append(new EventQueue.Event() {
+ @Override
+ public void run() throws Exception {
+ fileRecords = FileRecords.open(new File(snapshotPath), false);
+ batchIterator = fileRecords.batches().iterator();
+ scheduleHandleNextBatch();
+ future.complete(null);
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ future.completeExceptionally(e);
+ beginShutdown("startup error");
+ }
+ });
+ future.get();
+ }
+
+ private void handleNextBatch() {
+ if (!batchIterator.hasNext()) {
+ beginShutdown("done");
+ return;
+ }
+ FileChannelRecordBatch batch = batchIterator.next();
+ if (batch.isControlBatch()) {
+ handleControlBatch(batch);
+ } else {
+ handleMetadataBatch(batch);
+ }
+ scheduleHandleNextBatch();
+ }
+
+ private void scheduleHandleNextBatch() {
+ queue.append(new EventQueue.Event() {
+ @Override
+ public void run() throws Exception {
+ handleNextBatch();
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Unexpected error while handling a batch of events", e);
+ beginShutdown("handleBatch error");
+ }
+ });
+ }
+
+ private void handleControlBatch(FileChannelRecordBatch batch) {
+ for (Iterator iter = batch.iterator(); iter.hasNext(); ) {
+ Record record = iter.next();
+ try {
+ short typeId = ControlRecordType.parseTypeId(record.key());
+ ControlRecordType type = ControlRecordType.fromTypeId(typeId);
+ switch (type) {
+ case LEADER_CHANGE:
+ LeaderChangeMessage message = new LeaderChangeMessage();
+ message.read(new ByteBufferAccessor(record.value()), (short) 0);
+ listener.handleNewLeader(new MetaLogLeader(message.leaderId(),
+ batch.partitionLeaderEpoch()));
+ break;
+ default:
+ log.error("Ignoring control record with type {} at offset {}",
+ type, record.offset());
+ }
+ } catch (Throwable e) {
+ log.error("unable to read control record at offset {}", record.offset(), e);
+ }
+ }
+ }
+
+ private void handleMetadataBatch(FileChannelRecordBatch batch) {
+ List messages = new ArrayList<>();
+ for (Iterator iter = batch.iterator(); iter.hasNext(); ) {
+ Record record = iter.next();
+ ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
+ try {
+ int apiKey = accessor.readUnsignedVarint();
+ if (apiKey > Short.MAX_VALUE || apiKey < 0) {
+ throw new RuntimeException("Invalid apiKey value " + apiKey);
+ }
+ int apiVersion = accessor.readUnsignedVarint();
+ if (apiVersion > Short.MAX_VALUE || apiVersion < 0) {
+ throw new RuntimeException("Invalid apiVersion value " + apiVersion);
+ }
+ ApiMessage message = MetadataRecordType.fromId((short) apiKey).newMetadataRecord();
+ message.read(accessor, (short) apiVersion);
+ messages.add(message);
+ } catch (Throwable e) {
+ log.error("unable to read metadata record at offset {}", record.offset(), e);
+ }
+ }
+ listener.handleCommits(batch.lastOffset(), messages);
+ }
+
+ public void beginShutdown(String reason) {
+ if (reason.equals("done")) {
+ caughtUpFuture.complete(null);
+ } else {
+ caughtUpFuture.completeExceptionally(new RuntimeException(reason));
+ }
+ queue.beginShutdown(reason, new EventQueue.Event() {
+ @Override
+ public void run() throws Exception {
+ listener.beginShutdown();
+ if (fileRecords != null) {
+ fileRecords.close();
+ fileRecords = null;
+ }
+ batchIterator = null;
+ }
+
+ @Override
+ public void handleException(Throwable e) {
+ log.error("shutdown error", e);
+ }
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ beginShutdown("closing");
+ queue.close();
+ }
+
+ public CompletableFuture caughtUpFuture() {
+ return caughtUpFuture;
+ }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/CommandTest.java b/shell/src/test/java/org/apache/kafka/shell/CommandTest.java
new file mode 100644
index 00000000000..c896a06caa0
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/CommandTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class CommandTest {
+ @Test
+ public void testParseCommands() {
+ assertEquals(new CatCommandHandler(Arrays.asList("foo")),
+ new Commands(true).parseCommand(Arrays.asList("cat", "foo")));
+ assertEquals(new CdCommandHandler(Optional.empty()),
+ new Commands(true).parseCommand(Arrays.asList("cd")));
+ assertEquals(new CdCommandHandler(Optional.of("foo")),
+ new Commands(true).parseCommand(Arrays.asList("cd", "foo")));
+ assertEquals(new ExitCommandHandler(),
+ new Commands(true).parseCommand(Arrays.asList("exit")));
+ assertEquals(new HelpCommandHandler(),
+ new Commands(true).parseCommand(Arrays.asList("help")));
+ assertEquals(new HistoryCommandHandler(3),
+ new Commands(true).parseCommand(Arrays.asList("history", "3")));
+ assertEquals(new HistoryCommandHandler(Integer.MAX_VALUE),
+ new Commands(true).parseCommand(Arrays.asList("history")));
+ assertEquals(new LsCommandHandler(Collections.emptyList()),
+ new Commands(true).parseCommand(Arrays.asList("ls")));
+ assertEquals(new LsCommandHandler(Arrays.asList("abc", "123")),
+ new Commands(true).parseCommand(Arrays.asList("ls", "abc", "123")));
+ assertEquals(new PwdCommandHandler(),
+ new Commands(true).parseCommand(Arrays.asList("pwd")));
+ }
+
+ @Test
+ public void testParseInvalidCommand() {
+ assertEquals(new ErroneousCommandHandler("invalid choice: 'blah' (choose " +
+ "from 'cat', 'cd', 'exit', 'find', 'help', 'history', 'ls', 'man', 'pwd')"),
+ new Commands(true).parseCommand(Arrays.asList("blah")));
+ }
+
+ @Test
+ public void testEmptyCommandLine() {
+ assertEquals(new NoOpCommandHandler(),
+ new Commands(true).parseCommand(Arrays.asList("")));
+ assertEquals(new NoOpCommandHandler(),
+ new Commands(true).parseCommand(Collections.emptyList()));
+ }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java b/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java
new file mode 100644
index 00000000000..90c3b5c1489
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class CommandUtilsTest {
+ @Test
+ public void testSplitPath() {
+ assertEquals(Arrays.asList("alpha", "beta"),
+ CommandUtils.splitPath("/alpha/beta"));
+ assertEquals(Arrays.asList("alpha", "beta"),
+ CommandUtils.splitPath("//alpha/beta/"));
+ }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
new file mode 100644
index 00000000000..da3a7ec1081
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class GlobComponentTest {
+ private void verifyIsLiteral(GlobComponent globComponent, String component) {
+ assertTrue(globComponent.literal());
+ assertEquals(component, globComponent.component());
+ assertTrue(globComponent.matches(component));
+ assertFalse(globComponent.matches(component + "foo"));
+ }
+
+ @Test
+ public void testLiteralComponent() {
+ verifyIsLiteral(new GlobComponent("abc"), "abc");
+ verifyIsLiteral(new GlobComponent(""), "");
+ verifyIsLiteral(new GlobComponent("foobar_123"), "foobar_123");
+ verifyIsLiteral(new GlobComponent("$blah+"), "$blah+");
+ }
+
+ @Test
+ public void testToRegularExpression() {
+ assertEquals(null, GlobComponent.toRegularExpression("blah"));
+ assertEquals(null, GlobComponent.toRegularExpression(""));
+ assertEquals(null, GlobComponent.toRegularExpression("does not need a regex, actually"));
+ assertEquals("^\\$blah.*$", GlobComponent.toRegularExpression("$blah*"));
+ assertEquals("^.*$", GlobComponent.toRegularExpression("*"));
+ assertEquals("^foo(?:(?:bar)|(?:baz))$", GlobComponent.toRegularExpression("foo{bar,baz}"));
+ }
+
+ @Test
+ public void testGlobMatch() {
+ GlobComponent star = new GlobComponent("*");
+ assertFalse(star.literal());
+ assertTrue(star.matches(""));
+ assertTrue(star.matches("anything"));
+ GlobComponent question = new GlobComponent("b?b");
+ assertFalse(question.literal());
+ assertFalse(question.matches(""));
+ assertTrue(question.matches("bob"));
+ assertTrue(question.matches("bib"));
+ assertFalse(question.matches("bic"));
+ GlobComponent foobarOrFoobaz = new GlobComponent("foo{bar,baz}");
+ assertFalse(foobarOrFoobaz.literal());
+ assertTrue(foobarOrFoobaz.matches("foobar"));
+ assertTrue(foobarOrFoobaz.matches("foobaz"));
+ assertFalse(foobarOrFoobaz.matches("foobah"));
+ assertFalse(foobarOrFoobaz.matches("foo"));
+ assertFalse(foobarOrFoobaz.matches("baz"));
+ }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java b/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java
new file mode 100644
index 00000000000..59eeb5db79e
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class GlobVisitorTest {
+ static private final MetadataNodeManager.Data DATA;
+
+ static {
+ DATA = new MetadataNodeManager.Data();
+ DATA.root().mkdirs("alpha", "beta", "gamma");
+ DATA.root().mkdirs("alpha", "theta");
+ DATA.root().mkdirs("foo", "a");
+ DATA.root().mkdirs("foo", "beta");
+ DATA.root().mkdirs("zeta").create("c");
+ DATA.root().mkdirs("zeta");
+ DATA.root().create("zzz");
+ DATA.setWorkingDirectory("foo");
+ }
+
+ static class InfoConsumer implements Consumer> {
+ private Optional> infos = null;
+
+ @Override
+ public void accept(Optional info) {
+ if (infos == null) {
+ if (info.isPresent()) {
+ infos = Optional.of(new ArrayList<>());
+ infos.get().add(info.get());
+ } else {
+ infos = Optional.empty();
+ }
+ } else {
+ if (info.isPresent()) {
+ infos.get().add(info.get());
+ } else {
+ throw new RuntimeException("Saw non-empty info after seeing empty info");
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testStarGlob() {
+ InfoConsumer consumer = new InfoConsumer();
+ GlobVisitor visitor = new GlobVisitor("*", consumer);
+ visitor.accept(DATA);
+ assertEquals(Optional.of(Arrays.asList(
+ new MetadataNodeInfo(new String[] {"foo", "a"},
+ DATA.root().directory("foo").child("a")),
+ new MetadataNodeInfo(new String[] {"foo", "beta"},
+ DATA.root().directory("foo").child("beta")))), consumer.infos);
+ }
+
+ @Test
+ public void testDotDot() {
+ InfoConsumer consumer = new InfoConsumer();
+ GlobVisitor visitor = new GlobVisitor("..", consumer);
+ visitor.accept(DATA);
+ assertEquals(Optional.of(Arrays.asList(
+ new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos);
+ }
+
+ @Test
+ public void testDoubleDotDot() {
+ InfoConsumer consumer = new InfoConsumer();
+ GlobVisitor visitor = new GlobVisitor("../..", consumer);
+ visitor.accept(DATA);
+ assertEquals(Optional.of(Arrays.asList(
+ new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos);
+ }
+
+ @Test
+ public void testZGlob() {
+ InfoConsumer consumer = new InfoConsumer();
+ GlobVisitor visitor = new GlobVisitor("../z*", consumer);
+ visitor.accept(DATA);
+ assertEquals(Optional.of(Arrays.asList(
+ new MetadataNodeInfo(new String[] {"zeta"},
+ DATA.root().child("zeta")),
+ new MetadataNodeInfo(new String[] {"zzz"},
+ DATA.root().child("zzz")))), consumer.infos);
+ }
+
+ @Test
+ public void testBetaOrThetaGlob() {
+ InfoConsumer consumer = new InfoConsumer();
+ GlobVisitor visitor = new GlobVisitor("../*/{beta,theta}", consumer);
+ visitor.accept(DATA);
+ assertEquals(Optional.of(Arrays.asList(
+ new MetadataNodeInfo(new String[] {"alpha", "beta"},
+ DATA.root().directory("alpha").child("beta")),
+ new MetadataNodeInfo(new String[] {"alpha", "theta"},
+ DATA.root().directory("alpha").child("theta")),
+ new MetadataNodeInfo(new String[] {"foo", "beta"},
+ DATA.root().directory("foo").child("beta")))), consumer.infos);
+ }
+
+ @Test
+ public void testNotFoundGlob() {
+ InfoConsumer consumer = new InfoConsumer();
+ GlobVisitor visitor = new GlobVisitor("epsilon", consumer);
+ visitor.accept(DATA);
+ assertEquals(Optional.empty(), consumer.infos);
+ }
+
+ @Test
+ public void testAbsoluteGlob() {
+ InfoConsumer consumer = new InfoConsumer();
+ GlobVisitor visitor = new GlobVisitor("/a?pha", consumer);
+ visitor.accept(DATA);
+ assertEquals(Optional.of(Arrays.asList(
+ new MetadataNodeInfo(new String[] {"alpha"},
+ DATA.root().directory("alpha")))), consumer.infos);
+ }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java b/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java
new file mode 100644
index 00000000000..c845706f1b6
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.kafka.shell.LsCommandHandler.ColumnSchema;
+import org.apache.kafka.shell.LsCommandHandler.TargetDirectory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.OptionalInt;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class LsCommandHandlerTest {
+ @Test
+ public void testCalculateColumnSchema() {
+ assertEquals(new ColumnSchema(1, 3),
+ LsCommandHandler.calculateColumnSchema(OptionalInt.empty(),
+ Arrays.asList("abc", "def", "ghi")));
+ assertEquals(new ColumnSchema(1, 2),
+ LsCommandHandler.calculateColumnSchema(OptionalInt.of(0),
+ Arrays.asList("abc", "def")));
+ assertEquals(new ColumnSchema(3, 1).setColumnWidths(3, 8, 6),
+ LsCommandHandler.calculateColumnSchema(OptionalInt.of(80),
+ Arrays.asList("a", "abcdef", "beta")));
+ assertEquals(new ColumnSchema(2, 3).setColumnWidths(10, 7),
+ LsCommandHandler.calculateColumnSchema(OptionalInt.of(18),
+ Arrays.asList("alphabet", "beta", "gamma", "theta", "zeta")));
+ }
+
+ @Test
+ public void testPrintEntries() throws Exception {
+ try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+ try (PrintWriter writer = new PrintWriter(new OutputStreamWriter(
+ stream, StandardCharsets.UTF_8))) {
+ LsCommandHandler.printEntries(writer, "", OptionalInt.of(18),
+ Arrays.asList("alphabet", "beta", "gamma", "theta", "zeta"));
+ }
+ assertEquals(String.join(String.format("%n"), Arrays.asList(
+ "alphabet theta",
+ "beta zeta",
+ "gamma")), stream.toString().trim());
+ }
+ }
+
+ @Test
+ public void testPrintTargets() throws Exception {
+ try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+ try (PrintWriter writer = new PrintWriter(new OutputStreamWriter(
+ stream, StandardCharsets.UTF_8))) {
+ LsCommandHandler.printTargets(writer, OptionalInt.of(18),
+ Arrays.asList("foo", "foobarbaz", "quux"), Arrays.asList(
+ new TargetDirectory("/some/dir",
+ Collections.singletonList("supercalifragalistic")),
+ new TargetDirectory("/some/other/dir",
+ Arrays.asList("capability", "delegation", "elephant",
+ "fungible", "green"))));
+ }
+ assertEquals(String.join(String.format("%n"), Arrays.asList(
+ "foo quux",
+ "foobarbaz ",
+ "",
+ "/some/dir:",
+ "supercalifragalistic",
+ "",
+ "/some/other/dir:",
+ "capability",
+ "delegation",
+ "elephant",
+ "fungible",
+ "green")), stream.toString().trim());
+ }
+ }
+}
+
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
new file mode 100644
index 00000000000..42223c78c80
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class MetadataNodeTest {
+ @Test
+ public void testMkdirs() {
+ DirectoryNode root = new DirectoryNode();
+ DirectoryNode defNode = root.mkdirs("abc", "def");
+ DirectoryNode defNode2 = root.mkdirs("abc", "def");
+ assertTrue(defNode == defNode2);
+ DirectoryNode defNode3 = root.directory("abc", "def");
+ assertTrue(defNode == defNode3);
+ root.mkdirs("ghi");
+ assertEquals(new HashSet<>(Arrays.asList("abc", "ghi")), root.children().keySet());
+ assertEquals(Collections.singleton("def"), root.mkdirs("abc").children().keySet());
+ assertEquals(Collections.emptySet(), defNode.children().keySet());
+ }
+
+ @Test
+ public void testRmrf() {
+ DirectoryNode root = new DirectoryNode();
+ DirectoryNode foo = root.mkdirs("foo");
+ foo.mkdirs("a");
+ foo.mkdirs("b");
+ root.mkdirs("baz");
+ assertEquals(new HashSet<>(Arrays.asList("foo", "baz")), root.children().keySet());
+ root.rmrf("foo", "a");
+ assertEquals(new HashSet<>(Arrays.asList("b")), foo.children().keySet());
+ root.rmrf("foo");
+ assertEquals(new HashSet<>(Collections.singleton("baz")), root.children().keySet());
+ }
+
+ @Test
+ public void testCreateFiles() {
+ DirectoryNode root = new DirectoryNode();
+ DirectoryNode abcdNode = root.mkdirs("abcd");
+ FileNode quuxNodde = abcdNode.create("quux");
+ quuxNodde.setContents("quux contents");
+ assertEquals("quux contents", quuxNodde.contents());
+ assertThrows(NotDirectoryException.class, () -> root.mkdirs("abcd", "quux"));
+ }
+}