Browse Source

KAFKA-12334: Add the KIP-500 metadata shell

The Kafka Metadata shell is a new command which allows users to
interactively examine the metadata stored in a KIP-500 cluster.
It can examine snapshot files that are specified via --snapshot.

The metadata tool works by replaying the log and storing the state into
in-memory nodes.  These nodes are presented in a fashion similar to
filesystem directories.

Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>, Igor Soarez <soarez@apple.com>
pull/10070/head
Colin P. Mccabe 4 years ago
parent
commit
690f72dd69
  1. 17
      bin/kafka-metadata-shell.sh
  2. 43
      build.gradle
  3. 17
      checkstyle/import-control.xml
  4. 4
      checkstyle/suppressions.xml
  5. 2
      gradle/dependencies.gradle
  6. 378
      metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
  7. 4
      metadata/src/main/resources/common/metadata/IsrChangeRecord.json
  8. 4
      metadata/src/main/resources/common/metadata/PartitionRecord.json
  9. 2
      metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
  10. 2
      metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
  11. 1
      settings.gradle
  12. 120
      shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
  13. 117
      shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java
  14. 148
      shell/src/main/java/org/apache/kafka/shell/CommandUtils.java
  15. 154
      shell/src/main/java/org/apache/kafka/shell/Commands.java
  16. 58
      shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java
  17. 88
      shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java
  18. 121
      shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java
  19. 179
      shell/src/main/java/org/apache/kafka/shell/GlobComponent.java
  20. 148
      shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java
  21. 88
      shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java
  22. 108
      shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java
  23. 172
      shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
  24. 299
      shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java
  25. 109
      shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java
  26. 140
      shell/src/main/java/org/apache/kafka/shell/MetadataNode.java
  27. 302
      shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
  28. 174
      shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
  29. 43
      shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
  30. 30
      shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java
  31. 30
      shell/src/main/java/org/apache/kafka/shell/NotFileException.java
  32. 89
      shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java
  33. 194
      shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
  34. 70
      shell/src/test/java/org/apache/kafka/shell/CommandTest.java
  35. 37
      shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java
  36. 75
      shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
  37. 144
      shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java
  38. 99
      shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java
  39. 73
      shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java

17
bin/kafka-metadata-shell.sh

@ -0,0 +1,17 @@ @@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"

43
build.gradle

@ -1351,6 +1351,49 @@ project(':tools') { @@ -1351,6 +1351,49 @@ project(':tools') {
}
}
project(':shell') {
archivesBaseName = "kafka-shell"
dependencies {
compile libs.argparse4j
compile libs.jacksonDatabind
compile libs.jacksonJDK8Datatypes
compile libs.jline
compile libs.slf4jApi
compile project(':clients')
compile project(':core')
compile project(':log4j-appender')
compile project(':metadata')
compile project(':raft')
compile libs.jacksonJaxrsJsonProvider
testCompile project(':clients')
testCompile libs.junitJupiter
testRuntime libs.slf4jlog4j
}
javadoc {
enabled = false
}
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('jline-*jar')
}
from (configurations.runtime) {
include('jline-*jar')
}
into "$buildDir/dependant-libs-${versions.scala}"
duplicatesStrategy 'exclude'
}
jar {
dependsOn 'copyDependantLibs'
}
}
project(':streams') {
archivesBaseName = "kafka-streams"
ext.buildStreamsVersionFileName = "kafka-streams-version.properties"

17
checkstyle/import-control.xml

@ -269,6 +269,23 @@ @@ -269,6 +269,23 @@
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="shell">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.raft"/>
<allow pkg="kafka.server"/>
<allow pkg="kafka.tools"/>
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.metalog"/>
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.shell"/>
<allow pkg="org.apache.log4j" />
<allow pkg="org.jline"/>
<allow pkg="scala.compat"/>
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.clients.admin" />

4
checkstyle/suppressions.xml

@ -253,6 +253,10 @@ @@ -253,6 +253,10 @@
<suppress id="dontUseSystemExit"
files="VerifiableProducer.java"/>
<!-- Shell -->
<suppress checks="CyclomaticComplexity"
files="(GlobComponent).java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"
files="KafkaLog4jAppender.java"/>

2
gradle/dependencies.gradle

@ -71,6 +71,7 @@ versions += [ @@ -71,6 +71,7 @@ versions += [
jacoco: "0.8.5",
jetty: "9.4.33.v20201020",
jersey: "2.31",
jline: "3.12.1",
jmh: "1.27",
hamcrest: "2.2",
log4j: "1.2.17",
@ -149,6 +150,7 @@ libs += [ @@ -149,6 +150,7 @@ libs += [
jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
jerseyHk2: "org.glassfish.jersey.inject:jersey-hk2:$versions.jersey",
jline: "org.jline:jline:$versions.jline",
jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh",
jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",

378
metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java

@ -0,0 +1,378 @@ @@ -0,0 +1,378 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
/**
* The LocalLogManager is a test implementation that relies on the contents of memory.
*/
public final class LocalLogManager implements MetaLogManager, AutoCloseable {
interface LocalBatch {
int size();
}
static class LeaderChangeBatch implements LocalBatch {
private final MetaLogLeader newLeader;
LeaderChangeBatch(MetaLogLeader newLeader) {
this.newLeader = newLeader;
}
@Override
public int size() {
return 1;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LeaderChangeBatch)) return false;
LeaderChangeBatch other = (LeaderChangeBatch) o;
if (!other.newLeader.equals(newLeader)) return false;
return true;
}
@Override
public int hashCode() {
return Objects.hash(newLeader);
}
@Override
public String toString() {
return "LeaderChangeBatch(newLeader=" + newLeader + ")";
}
}
static class LocalRecordBatch implements LocalBatch {
private final List<ApiMessage> records;
LocalRecordBatch(List<ApiMessage> records) {
this.records = records;
}
@Override
public int size() {
return records.size();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LocalRecordBatch)) return false;
LocalRecordBatch other = (LocalRecordBatch) o;
if (!other.records.equals(records)) return false;
return true;
}
@Override
public int hashCode() {
return Objects.hash(records);
}
@Override
public String toString() {
return "LocalRecordBatch(records=" + records + ")";
}
}
public static class SharedLogData {
private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
private final HashMap<Integer, LocalLogManager> logManagers = new HashMap<>();
private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
private MetaLogLeader leader = new MetaLogLeader(-1, -1);
private long prevOffset = -1;
synchronized void registerLogManager(LocalLogManager logManager) {
if (logManagers.put(logManager.nodeId(), logManager) != null) {
throw new RuntimeException("Can't have multiple LocalLogManagers " +
"with id " + logManager.nodeId());
}
electLeaderIfNeeded();
}
synchronized void unregisterLogManager(LocalLogManager logManager) {
if (!logManagers.remove(logManager.nodeId(), logManager)) {
throw new RuntimeException("Log manager " + logManager.nodeId() +
" was not found.");
}
}
synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
if (epoch != leader.epoch()) {
log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
"match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
return Long.MAX_VALUE;
}
if (nodeId != leader.nodeId()) {
log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
"match the current leader id of {}.", nodeId, leader.nodeId());
return Long.MAX_VALUE;
}
log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
long offset = append(batch);
electLeaderIfNeeded();
return offset;
}
synchronized long append(LocalBatch batch) {
prevOffset += batch.size();
log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
batches.put(prevOffset, batch);
if (batch instanceof LeaderChangeBatch) {
LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch;
leader = leaderChangeBatch.newLeader;
}
for (LocalLogManager logManager : logManagers.values()) {
logManager.scheduleLogCheck();
}
return prevOffset;
}
synchronized void electLeaderIfNeeded() {
if (leader.nodeId() != -1 || logManagers.isEmpty()) {
return;
}
int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
Iterator<Integer> iter = logManagers.keySet().iterator();
Integer nextLeaderNode = null;
for (int i = 0; i <= nextLeaderIndex; i++) {
nextLeaderNode = iter.next();
}
MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
log.info("Elected new leader: {}.", newLeader);
append(new LeaderChangeBatch(newLeader));
}
synchronized Entry<Long, LocalBatch> nextBatch(long offset) {
Entry<Long, LocalBatch> entry = batches.higherEntry(offset);
if (entry == null) {
return null;
}
return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
}
}
private static class MetaLogListenerData {
private long offset = -1;
private final MetaLogListener listener;
MetaLogListenerData(MetaLogListener listener) {
this.listener = listener;
}
}
private final Logger log;
private final int nodeId;
private final SharedLogData shared;
private final EventQueue eventQueue;
private boolean initialized = false;
private boolean shutdown = false;
private long maxReadOffset = Long.MAX_VALUE;
private final List<MetaLogListenerData> listeners = new ArrayList<>();
private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
public LocalLogManager(LogContext logContext,
int nodeId,
SharedLogData shared,
String threadNamePrefix) {
this.log = logContext.logger(LocalLogManager.class);
this.nodeId = nodeId;
this.shared = shared;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
shared.registerLogManager(this);
}
private void scheduleLogCheck() {
eventQueue.append(() -> {
try {
log.debug("Node {}: running log check.", nodeId);
int numEntriesFound = 0;
for (MetaLogListenerData listenerData : listeners) {
while (true) {
Entry<Long, LocalBatch> entry = shared.nextBatch(listenerData.offset);
if (entry == null) {
log.trace("Node {}: reached the end of the log after finding " +
"{} entries.", nodeId, numEntriesFound);
break;
}
long entryOffset = entry.getKey();
if (entryOffset > maxReadOffset) {
log.trace("Node {}: after {} entries, not reading the next " +
"entry because its offset is {}, and maxReadOffset is {}.",
nodeId, numEntriesFound, entryOffset, maxReadOffset);
break;
}
if (entry.getValue() instanceof LeaderChangeBatch) {
LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
log.trace("Node {}: handling LeaderChange to {}.",
nodeId, batch.newLeader);
listenerData.listener.handleNewLeader(batch.newLeader);
if (batch.newLeader.epoch() > leader.epoch()) {
leader = batch.newLeader;
}
} else if (entry.getValue() instanceof LocalRecordBatch) {
LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
log.trace("Node {}: handling LocalRecordBatch with offset {}.",
nodeId, entryOffset);
listenerData.listener.handleCommits(entryOffset, batch.records);
}
numEntriesFound++;
listenerData.offset = entryOffset;
}
}
log.trace("Completed log check for node " + nodeId);
} catch (Exception e) {
log.error("Exception while handling log check", e);
}
});
}
public void beginShutdown() {
eventQueue.beginShutdown("beginShutdown", () -> {
try {
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
renounce(leader.epoch());
for (MetaLogListenerData listenerData : listeners) {
listenerData.listener.beginShutdown();
}
shared.unregisterLogManager(this);
}
} catch (Exception e) {
log.error("Unexpected exception while sending beginShutdown callbacks", e);
}
shutdown = true;
});
}
@Override
public void close() throws InterruptedException {
log.debug("Node {}: closing.", nodeId);
beginShutdown();
eventQueue.close();
}
@Override
public void initialize() throws Exception {
eventQueue.append(() -> {
log.debug("initialized local log manager for node " + nodeId);
initialized = true;
});
}
@Override
public void register(MetaLogListener listener) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
if (shutdown) {
log.info("Node {}: can't register because local log manager has " +
"already been shut down.", nodeId);
future.complete(null);
} else if (initialized) {
log.info("Node {}: registered MetaLogListener.", nodeId);
listeners.add(new MetaLogListenerData(listener));
shared.electLeaderIfNeeded();
scheduleLogCheck();
future.complete(null);
} else {
log.info("Node {}: can't register because local log manager has not " +
"been initialized.", nodeId);
future.completeExceptionally(new RuntimeException(
"LocalLogManager was not initialized."));
}
});
future.get();
}
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
batch.stream().map(r -> r.message()).collect(Collectors.toList())));
}
@Override
public void renounce(long epoch) {
MetaLogLeader curLeader = leader;
MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
}
@Override
public MetaLogLeader leader() {
return leader;
}
@Override
public int nodeId() {
return nodeId;
}
public List<MetaLogListener> listeners() {
final CompletableFuture<List<MetaLogListener>> future = new CompletableFuture<>();
eventQueue.append(() -> {
future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
});
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
public void setMaxReadOffset(long maxReadOffset) {
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset);
this.maxReadOffset = maxReadOffset;
scheduleLogCheck();
future.complete(null);
});
try {
future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}

4
metadata/src/main/resources/common/metadata/IsrChangeRecord.json

@ -28,6 +28,8 @@ @@ -28,6 +28,8 @@
{ "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
"about": "The lead replica, or -1 if there is no leader." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "An epoch that gets incremented each time we change the ISR." }
"about": "An epoch that gets incremented each time we change the partition leader." },
{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "An epoch that gets incremented each time we change anything in the partition." }
]
}

4
metadata/src/main/resources/common/metadata/PartitionRecord.json

@ -34,6 +34,8 @@ @@ -34,6 +34,8 @@
{ "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
"about": "The lead replica, or -1 if there is no leader." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "An epoch that gets incremented each time we change the ISR." }
"about": "An epoch that gets incremented each time we change the partition leader." },
{ "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
"about": "An epoch that gets incremented each time we change anything in the partition." }
]
}

2
metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java

@ -82,7 +82,7 @@ public class MetadataParserTest { @@ -82,7 +82,7 @@ public class MetadataParserTest {
PartitionRecord partitionRecord = new PartitionRecord().
setReplicas(longReplicaList);
ObjectSerializationCache cache = new ObjectSerializationCache();
assertEquals("Event size would be 33554478, but the maximum serialized event " +
assertEquals("Event size would be 33554482, but the maximum serialized event " +
"size is 33554432", assertThrows(RuntimeException.class, () -> {
MetadataParser.size(partitionRecord, (short) 0, cache);
}).getMessage());

2
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java

@ -51,7 +51,7 @@ public class LocalLogManagerTest { @@ -51,7 +51,7 @@ public class LocalLogManagerTest {
}
/**
* Test that the local log maanger will claim leadership.
* Test that the local log manager will claim leadership.
*/
@Test
public void testClaimsLeadership() throws Exception {

1
settings.gradle

@ -29,6 +29,7 @@ include 'clients', @@ -29,6 +29,7 @@ include 'clients',
'log4j-appender',
'metadata',
'raft',
'shell',
'streams',
'streams:examples',
'streams:streams-scala',

120
shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java

@ -0,0 +1,120 @@ @@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.apache.kafka.shell.MetadataNode.FileNode;
import org.jline.reader.Candidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Implements the cat command.
*/
public final class CatCommandHandler implements Commands.Handler {
private static final Logger log = LoggerFactory.getLogger(CatCommandHandler.class);
public final static Commands.Type TYPE = new CatCommandType();
public static class CatCommandType implements Commands.Type {
private CatCommandType() {
}
@Override
public String name() {
return "cat";
}
@Override
public String description() {
return "Show the contents of metadata nodes.";
}
@Override
public boolean shellOnly() {
return false;
}
@Override
public void addArguments(ArgumentParser parser) {
parser.addArgument("targets").
nargs("+").
help("The metadata nodes to display.");
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
return new CatCommandHandler(namespace.getList("targets"));
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
candidates);
}
}
private final List<String> targets;
public CatCommandHandler(List<String> targets) {
this.targets = targets;
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) throws Exception {
log.trace("cat " + targets);
for (String target : targets) {
manager.visit(new GlobVisitor(target, entryOption -> {
if (entryOption.isPresent()) {
MetadataNode node = entryOption.get().node();
if (node instanceof DirectoryNode) {
writer.println("cat: " + target + ": Is a directory");
} else if (node instanceof FileNode) {
FileNode fileNode = (FileNode) node;
writer.println(fileNode.contents());
}
} else {
writer.println("cat: " + target + ": No such file or directory.");
}
}));
}
}
@Override
public int hashCode() {
return targets.hashCode();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof CatCommandHandler)) return false;
CatCommandHandler o = (CatCommandHandler) other;
if (!Objects.equals(o.targets, targets)) return false;
return true;
}
}

117
shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java

@ -0,0 +1,117 @@ @@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.jline.reader.Candidate;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
/**
* Implements the cd command.
*/
public final class CdCommandHandler implements Commands.Handler {
public final static Commands.Type TYPE = new CdCommandType();
public static class CdCommandType implements Commands.Type {
private CdCommandType() {
}
@Override
public String name() {
return "cd";
}
@Override
public String description() {
return "Set the current working directory.";
}
@Override
public boolean shellOnly() {
return true;
}
@Override
public void addArguments(ArgumentParser parser) {
parser.addArgument("target").
nargs("?").
help("The directory to change to.");
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
return new CdCommandHandler(Optional.ofNullable(namespace.getString("target")));
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
if (nextWords.size() == 1) {
CommandUtils.completePath(nodeManager, nextWords.get(0), candidates);
}
}
}
private final Optional<String> target;
public CdCommandHandler(Optional<String> target) {
this.target = target;
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) throws Exception {
String effectiveTarget = target.orElse("/");
manager.visit(new Consumer<MetadataNodeManager.Data>() {
@Override
public void accept(MetadataNodeManager.Data data) {
new GlobVisitor(effectiveTarget, entryOption -> {
if (entryOption.isPresent()) {
if (!(entryOption.get().node() instanceof DirectoryNode)) {
writer.println("cd: " + effectiveTarget + ": not a directory.");
} else {
data.setWorkingDirectory(entryOption.get().absolutePath());
}
} else {
writer.println("cd: " + effectiveTarget + ": no such directory.");
}
}).accept(data);
}
});
}
@Override
public int hashCode() {
return target.hashCode();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof CdCommandHandler)) return false;
CdCommandHandler o = (CdCommandHandler) other;
if (!o.target.equals(target)) return false;
return true;
}
}

148
shell/src/main/java/org/apache/kafka/shell/CommandUtils.java

@ -0,0 +1,148 @@ @@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.jline.reader.Candidate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
/**
* Utility functions for command handlers.
*/
public final class CommandUtils {
/**
* Convert a list of paths into the effective list of paths which should be used.
* Empty strings will be removed. If no paths are given, the current working
* directory will be used.
*
* @param paths The input paths. Non-null.
*
* @return The output paths.
*/
public static List<String> getEffectivePaths(List<String> paths) {
List<String> effectivePaths = new ArrayList<>();
for (String path : paths) {
if (!path.isEmpty()) {
effectivePaths.add(path);
}
}
if (effectivePaths.isEmpty()) {
effectivePaths.add(".");
}
return effectivePaths;
}
/**
* Generate a list of potential completions for a prefix of a command name.
*
* @param commandPrefix The command prefix. Non-null.
* @param candidates The list to add the output completions to.
*/
public static void completeCommand(String commandPrefix, List<Candidate> candidates) {
String command = Commands.TYPES.ceilingKey(commandPrefix);
while (command != null && command.startsWith(commandPrefix)) {
candidates.add(new Candidate(command));
command = Commands.TYPES.higherKey(command);
}
}
/**
* Convert a path to a list of path components.
* Multiple slashes in a row are treated the same as a single slash.
* Trailing slashes are ignored.
*/
public static List<String> splitPath(String path) {
List<String> results = new ArrayList<>();
String[] components = path.split("/");
for (int i = 0; i < components.length; i++) {
if (!components[i].isEmpty()) {
results.add(components[i]);
}
}
return results;
}
public static List<String> stripDotPathComponents(List<String> input) {
List<String> output = new ArrayList<>();
for (String string : input) {
if (string.equals("..")) {
if (output.size() > 0) {
output.remove(output.size() - 1);
}
} else if (!string.equals(".")) {
output.add(string);
}
}
return output;
}
/**
* Generate a list of potential completions for a path.
*
* @param nodeManager The NodeManager.
* @param pathPrefix The path prefix. Non-null.
* @param candidates The list to add the output completions to.
*/
public static void completePath(MetadataNodeManager nodeManager,
String pathPrefix,
List<Candidate> candidates) throws Exception {
nodeManager.visit(data -> {
String absolutePath = pathPrefix.startsWith("/") ?
pathPrefix : data.workingDirectory() + "/" + pathPrefix;
List<String> pathComponents = stripDotPathComponents(splitPath(absolutePath));
DirectoryNode directory = data.root();
int numDirectories = pathPrefix.endsWith("/") ?
pathComponents.size() : pathComponents.size() - 1;
for (int i = 0; i < numDirectories; i++) {
MetadataNode node = directory.child(pathComponents.get(i));
if (node == null || !(node instanceof DirectoryNode)) {
return;
}
directory = (DirectoryNode) node;
}
String lastComponent = "";
if (numDirectories >= 0 && numDirectories < pathComponents.size()) {
lastComponent = pathComponents.get(numDirectories);
}
Entry<String, MetadataNode> candidate =
directory.children().ceilingEntry(lastComponent);
String effectivePrefix;
int lastSlash = pathPrefix.lastIndexOf('/');
if (lastSlash < 0) {
effectivePrefix = "";
} else {
effectivePrefix = pathPrefix.substring(0, lastSlash + 1);
}
while (candidate != null && candidate.getKey().startsWith(lastComponent)) {
StringBuilder candidateBuilder = new StringBuilder();
candidateBuilder.append(effectivePrefix).append(candidate.getKey());
boolean complete = true;
if (candidate.getValue() instanceof DirectoryNode) {
candidateBuilder.append("/");
complete = false;
}
candidates.add(new Candidate(candidateBuilder.toString(),
candidateBuilder.toString(), null, null, null, null, complete));
candidate = directory.children().higherEntry(candidate.getKey());
}
});
}
}

154
shell/src/main/java/org/apache/kafka/shell/Commands.java

@ -0,0 +1,154 @@ @@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import net.sourceforge.argparse4j.internal.HelpScreenException;
import org.jline.reader.Candidate;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
/**
* The commands for the Kafka metadata tool.
*/
public final class Commands {
/**
* A map from command names to command types.
*/
static final NavigableMap<String, Type> TYPES;
static {
TreeMap<String, Type> typesMap = new TreeMap<>();
for (Type type : Arrays.asList(
CatCommandHandler.TYPE,
CdCommandHandler.TYPE,
ExitCommandHandler.TYPE,
FindCommandHandler.TYPE,
HelpCommandHandler.TYPE,
HistoryCommandHandler.TYPE,
LsCommandHandler.TYPE,
ManCommandHandler.TYPE,
PwdCommandHandler.TYPE)) {
typesMap.put(type.name(), type);
}
TYPES = Collections.unmodifiableNavigableMap(typesMap);
}
/**
* Command handler objects are instantiated with specific arguments to
* execute commands.
*/
public interface Handler {
void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) throws Exception;
}
/**
* An object which describes a type of command handler. This includes
* information like its name, help text, and whether it should be accessible
* from non-interactive mode.
*/
public interface Type {
String name();
String description();
boolean shellOnly();
void addArguments(ArgumentParser parser);
Handler createHandler(Namespace namespace);
void completeNext(MetadataNodeManager nodeManager,
List<String> nextWords,
List<Candidate> candidates) throws Exception;
}
private final ArgumentParser parser;
/**
* Create the commands instance.
*
* @param addShellCommands True if we should include the shell-only commands.
*/
public Commands(boolean addShellCommands) {
this.parser = ArgumentParsers.newArgumentParser("", false);
Subparsers subparsers = this.parser.addSubparsers().dest("command");
for (Type type : TYPES.values()) {
if (addShellCommands || !type.shellOnly()) {
Subparser subParser = subparsers.addParser(type.name());
subParser.help(type.description());
type.addArguments(subParser);
}
}
}
ArgumentParser parser() {
return parser;
}
/**
* Handle the given command.
*
* In general this function should not throw exceptions. Instead, it should
* return ErroneousCommandHandler if the input was invalid.
*
* @param arguments The command line arguments.
* @return The command handler.
*/
public Handler parseCommand(List<String> arguments) {
List<String> trimmedArguments = new ArrayList<>(arguments);
while (true) {
if (trimmedArguments.isEmpty()) {
return new NoOpCommandHandler();
}
String last = trimmedArguments.get(trimmedArguments.size() - 1);
if (!last.isEmpty()) {
break;
}
trimmedArguments.remove(trimmedArguments.size() - 1);
}
Namespace namespace;
try {
namespace = parser.parseArgs(trimmedArguments.toArray(new String[0]));
} catch (HelpScreenException e) {
return new NoOpCommandHandler();
} catch (ArgumentParserException e) {
return new ErroneousCommandHandler(e.getMessage());
}
String command = namespace.get("command");
if (!command.equals(trimmedArguments.get(0))) {
return new ErroneousCommandHandler("invalid choice: '" +
trimmedArguments.get(0) + "': did you mean '" + command + "'?");
}
Type type = TYPES.get(command);
if (type == null) {
return new ErroneousCommandHandler("Unknown command specified: " + command);
} else {
return type.createHandler(namespace);
}
}
}

58
shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java

@ -0,0 +1,58 @@ @@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import java.io.PrintWriter;
import java.util.Objects;
import java.util.Optional;
/**
* Handles erroneous commands.
*/
public final class ErroneousCommandHandler implements Commands.Handler {
private final String message;
public ErroneousCommandHandler(String message) {
this.message = message;
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) {
writer.println(message);
}
@Override
public int hashCode() {
return Objects.hashCode(message);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof ErroneousCommandHandler)) return false;
ErroneousCommandHandler o = (ErroneousCommandHandler) other;
if (!Objects.equals(o.message, message)) return false;
return true;
}
@Override
public String toString() {
return "ErroneousCommandHandler(" + message + ")";
}
}

88
shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java

@ -0,0 +1,88 @@ @@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.jline.reader.Candidate;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
/**
* Implements the exit command.
*/
public final class ExitCommandHandler implements Commands.Handler {
public final static Commands.Type TYPE = new ExitCommandType();
public static class ExitCommandType implements Commands.Type {
private ExitCommandType() {
}
@Override
public String name() {
return "exit";
}
@Override
public String description() {
return "Exit the metadata shell.";
}
@Override
public boolean shellOnly() {
return true;
}
@Override
public void addArguments(ArgumentParser parser) {
// nothing to do
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
return new ExitCommandHandler();
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
// nothing to do
}
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) {
Exit.exit(0);
}
@Override
public int hashCode() {
return 0;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof ExitCommandHandler)) return false;
return true;
}
}

121
shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java

@ -0,0 +1,121 @@ @@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.jline.reader.Candidate;
import java.io.PrintWriter;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
/**
* Implements the find command.
*/
public final class FindCommandHandler implements Commands.Handler {
public final static Commands.Type TYPE = new FindCommandType();
public static class FindCommandType implements Commands.Type {
private FindCommandType() {
}
@Override
public String name() {
return "find";
}
@Override
public String description() {
return "Search for nodes in the directory hierarchy.";
}
@Override
public boolean shellOnly() {
return false;
}
@Override
public void addArguments(ArgumentParser parser) {
parser.addArgument("paths").
nargs("*").
help("The paths to start at.");
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
return new FindCommandHandler(namespace.getList("paths"));
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
candidates);
}
}
private final List<String> paths;
public FindCommandHandler(List<String> paths) {
this.paths = paths;
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) throws Exception {
for (String path : CommandUtils.getEffectivePaths(paths)) {
manager.visit(new GlobVisitor(path, entryOption -> {
if (entryOption.isPresent()) {
find(writer, path, entryOption.get().node());
} else {
writer.println("find: " + path + ": no such file or directory.");
}
}));
}
}
private void find(PrintWriter writer, String path, MetadataNode node) {
writer.println(path);
if (node instanceof DirectoryNode) {
DirectoryNode directory = (DirectoryNode) node;
for (Entry<String, MetadataNode> entry : directory.children().entrySet()) {
String nextPath = path.equals("/") ?
path + entry.getKey() : path + "/" + entry.getKey();
find(writer, nextPath, entry.getValue());
}
}
}
@Override
public int hashCode() {
return Objects.hashCode(paths);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof FindCommandHandler)) return false;
FindCommandHandler o = (FindCommandHandler) other;
if (!Objects.equals(o.paths, paths)) return false;
return true;
}
}

179
shell/src/main/java/org/apache/kafka/shell/GlobComponent.java

@ -0,0 +1,179 @@ @@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Pattern;
/**
* Implements a per-path-component glob.
*/
public final class GlobComponent {
private static final Logger log = LoggerFactory.getLogger(GlobComponent.class);
/**
* Returns true if the character is a special character for regular expressions.
*/
private static boolean isRegularExpressionSpecialCharacter(char ch) {
switch (ch) {
case '$':
case '(':
case ')':
case '+':
case '.':
case '[':
case ']':
case '^':
case '{':
case '|':
return true;
default:
break;
}
return false;
}
/**
* Returns true if the character is a special character for globs.
*/
private static boolean isGlobSpecialCharacter(char ch) {
switch (ch) {
case '*':
case '?':
case '\\':
case '{':
case '}':
return true;
default:
break;
}
return false;
}
/**
* Converts a glob string to a regular expression string.
* Returns null if the glob should be handled as a literal (can only match one string).
* Throws an exception if the glob is malformed.
*/
static String toRegularExpression(String glob) {
StringBuilder output = new StringBuilder("^");
boolean literal = true;
boolean processingGroup = false;
for (int i = 0; i < glob.length(); ) {
char c = glob.charAt(i++);
switch (c) {
case '?':
literal = false;
output.append(".");
break;
case '*':
literal = false;
output.append(".*");
break;
case '\\':
if (i == glob.length()) {
output.append(c);
} else {
char next = glob.charAt(i);
i++;
if (isGlobSpecialCharacter(next) ||
isRegularExpressionSpecialCharacter(next)) {
output.append('\\');
}
output.append(next);
}
break;
case '{':
if (processingGroup) {
throw new RuntimeException("Can't nest glob groups.");
}
literal = false;
output.append("(?:(?:");
processingGroup = true;
break;
case ',':
if (processingGroup) {
literal = false;
output.append(")|(?:");
} else {
output.append(c);
}
break;
case '}':
if (processingGroup) {
literal = false;
output.append("))");
processingGroup = false;
} else {
output.append(c);
}
break;
// TODO: handle character ranges
default:
if (isRegularExpressionSpecialCharacter(c)) {
output.append('\\');
}
output.append(c);
}
}
if (processingGroup) {
throw new RuntimeException("Unterminated glob group.");
}
if (literal) {
return null;
}
output.append('$');
return output.toString();
}
private final String component;
private final Pattern pattern;
public GlobComponent(String component) {
this.component = component;
Pattern newPattern = null;
try {
String regularExpression = toRegularExpression(component);
if (regularExpression != null) {
newPattern = Pattern.compile(regularExpression);
}
} catch (RuntimeException e) {
log.debug("Invalid glob pattern: " + e.getMessage());
}
this.pattern = newPattern;
}
public String component() {
return component;
}
public boolean literal() {
return pattern == null;
}
public boolean matches(String nodeName) {
if (pattern == null) {
return component.equals(nodeName);
} else {
return pattern.matcher(nodeName).matches();
}
}
}

148
shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java

@ -0,0 +1,148 @@ @@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
/**
* Visits metadata paths based on a glob string.
*/
public final class GlobVisitor implements Consumer<MetadataNodeManager.Data> {
private final String glob;
private final Consumer<Optional<MetadataNodeInfo>> handler;
public GlobVisitor(String glob,
Consumer<Optional<MetadataNodeInfo>> handler) {
this.glob = glob;
this.handler = handler;
}
public static class MetadataNodeInfo {
private final String[] path;
private final MetadataNode node;
MetadataNodeInfo(String[] path, MetadataNode node) {
this.path = path;
this.node = node;
}
public String[] path() {
return path;
}
public MetadataNode node() {
return node;
}
public String lastPathComponent() {
if (path.length == 0) {
return "/";
} else {
return path[path.length - 1];
}
}
public String absolutePath() {
return "/" + String.join("/", path);
}
@Override
public int hashCode() {
return Objects.hash(path, node);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof MetadataNodeInfo)) return false;
MetadataNodeInfo other = (MetadataNodeInfo) o;
if (!Arrays.equals(path, other.path)) return false;
if (!node.equals(other.node)) return false;
return true;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder("MetadataNodeInfo(path=");
for (int i = 0; i < path.length; i++) {
bld.append("/");
bld.append(path[i]);
}
bld.append(", node=").append(node).append(")");
return bld.toString();
}
}
@Override
public void accept(MetadataNodeManager.Data data) {
String fullGlob = glob.startsWith("/") ? glob :
data.workingDirectory() + "/" + glob;
List<String> globComponents =
CommandUtils.stripDotPathComponents(CommandUtils.splitPath(fullGlob));
if (!accept(globComponents, 0, data.root(), new String[0])) {
handler.accept(Optional.empty());
}
}
private boolean accept(List<String> globComponents,
int componentIndex,
MetadataNode node,
String[] path) {
if (componentIndex >= globComponents.size()) {
handler.accept(Optional.of(new MetadataNodeInfo(path, node)));
return true;
}
String globComponentString = globComponents.get(componentIndex);
GlobComponent globComponent = new GlobComponent(globComponentString);
if (globComponent.literal()) {
if (!(node instanceof MetadataNode.DirectoryNode)) {
return false;
}
MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node;
MetadataNode child = directory.child(globComponent.component());
if (child == null) {
return false;
}
String[] newPath = new String[path.length + 1];
System.arraycopy(path, 0, newPath, 0, path.length);
newPath[path.length] = globComponent.component();
return accept(globComponents, componentIndex + 1, child, newPath);
}
if (!(node instanceof MetadataNode.DirectoryNode)) {
return false;
}
MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node;
boolean matchedAny = false;
for (Entry<String, MetadataNode> entry : directory.children().entrySet()) {
String nodeName = entry.getKey();
if (globComponent.matches(nodeName)) {
String[] newPath = new String[path.length + 1];
System.arraycopy(path, 0, newPath, 0, path.length);
newPath[path.length] = nodeName;
if (accept(globComponents, componentIndex + 1, entry.getValue(), newPath)) {
matchedAny = true;
}
}
}
return matchedAny;
}
}

88
shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java

@ -0,0 +1,88 @@ @@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.jline.reader.Candidate;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
/**
* Implements the help command.
*/
public final class HelpCommandHandler implements Commands.Handler {
public final static Commands.Type TYPE = new HelpCommandType();
public static class HelpCommandType implements Commands.Type {
private HelpCommandType() {
}
@Override
public String name() {
return "help";
}
@Override
public String description() {
return "Display this help message.";
}
@Override
public boolean shellOnly() {
return true;
}
@Override
public void addArguments(ArgumentParser parser) {
// nothing to do
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
return new HelpCommandHandler();
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
// nothing to do
}
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) {
writer.printf("Welcome to the Apache Kafka metadata shell.%n%n");
new Commands(true).parser().printHelp(writer);
}
@Override
public int hashCode() {
return 0;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof HelpCommandHandler)) return false;
return true;
}
}

108
shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java

@ -0,0 +1,108 @@ @@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.jline.reader.Candidate;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Implements the history command.
*/
public final class HistoryCommandHandler implements Commands.Handler {
public final static Commands.Type TYPE = new HistoryCommandType();
public static class HistoryCommandType implements Commands.Type {
private HistoryCommandType() {
}
@Override
public String name() {
return "history";
}
@Override
public String description() {
return "Print command history.";
}
@Override
public boolean shellOnly() {
return true;
}
@Override
public void addArguments(ArgumentParser parser) {
parser.addArgument("numEntriesToShow").
nargs("?").
type(Integer.class).
help("The number of entries to show.");
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
Integer numEntriesToShow = namespace.getInt("numEntriesToShow");
return new HistoryCommandHandler(numEntriesToShow == null ?
Integer.MAX_VALUE : numEntriesToShow);
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
// nothing to do
}
}
private final int numEntriesToShow;
public HistoryCommandHandler(int numEntriesToShow) {
this.numEntriesToShow = numEntriesToShow;
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) throws Exception {
if (!shell.isPresent()) {
throw new RuntimeException("The history command requires a shell.");
}
Iterator<Map.Entry<Integer, String>> iter = shell.get().history(numEntriesToShow);
while (iter.hasNext()) {
Map.Entry<Integer, String> entry = iter.next();
writer.printf("% 5d %s%n", entry.getKey(), entry.getValue());
}
}
@Override
public int hashCode() {
return numEntriesToShow;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof HistoryCommandHandler)) return false;
HistoryCommandHandler o = (HistoryCommandHandler) other;
return o.numEntriesToShow == numEntriesToShow;
}
}

172
shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java

@ -0,0 +1,172 @@ @@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import org.jline.reader.Candidate;
import org.jline.reader.Completer;
import org.jline.reader.EndOfFileException;
import org.jline.reader.History;
import org.jline.reader.LineReader;
import org.jline.reader.LineReaderBuilder;
import org.jline.reader.ParsedLine;
import org.jline.reader.Parser;
import org.jline.reader.UserInterruptException;
import org.jline.reader.impl.DefaultParser;
import org.jline.reader.impl.history.DefaultHistory;
import org.jline.terminal.Terminal;
import org.jline.terminal.TerminalBuilder;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
/**
* The Kafka metadata shell.
*/
public final class InteractiveShell implements AutoCloseable {
static class MetadataShellCompleter implements Completer {
private final MetadataNodeManager nodeManager;
MetadataShellCompleter(MetadataNodeManager nodeManager) {
this.nodeManager = nodeManager;
}
@Override
public void complete(LineReader reader, ParsedLine line, List<Candidate> candidates) {
if (line.words().size() == 0) {
CommandUtils.completeCommand("", candidates);
} else if (line.words().size() == 1) {
CommandUtils.completeCommand(line.words().get(0), candidates);
} else {
Iterator<String> iter = line.words().iterator();
String command = iter.next();
List<String> nextWords = new ArrayList<>();
while (iter.hasNext()) {
nextWords.add(iter.next());
}
Commands.Type type = Commands.TYPES.get(command);
if (type == null) {
return;
}
try {
type.completeNext(nodeManager, nextWords, candidates);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
private final MetadataNodeManager nodeManager;
private final Terminal terminal;
private final Parser parser;
private final History history;
private final MetadataShellCompleter completer;
private final LineReader reader;
public InteractiveShell(MetadataNodeManager nodeManager) throws IOException {
this.nodeManager = nodeManager;
TerminalBuilder builder = TerminalBuilder.builder().
system(true).
nativeSignals(true);
this.terminal = builder.build();
this.parser = new DefaultParser();
this.history = new DefaultHistory();
this.completer = new MetadataShellCompleter(nodeManager);
this.reader = LineReaderBuilder.builder().
terminal(terminal).
parser(parser).
history(history).
completer(completer).
option(LineReader.Option.AUTO_FRESH_LINE, false).
build();
}
public void runMainLoop() throws Exception {
terminal.writer().println("[ Kafka Metadata Shell ]");
terminal.flush();
Commands commands = new Commands(true);
while (true) {
try {
reader.readLine(">> ");
ParsedLine parsedLine = reader.getParsedLine();
Commands.Handler handler = commands.parseCommand(parsedLine.words());
handler.run(Optional.of(this), terminal.writer(), nodeManager);
terminal.writer().flush();
} catch (UserInterruptException eof) {
// Handle the user pressing control-C.
terminal.writer().println("^C");
} catch (EndOfFileException eof) {
return;
}
}
}
public int screenWidth() {
return terminal.getWidth();
}
public Iterator<Entry<Integer, String>> history(int numEntriesToShow) {
if (numEntriesToShow < 0) {
numEntriesToShow = 0;
}
int last = history.last();
if (numEntriesToShow > last + 1) {
numEntriesToShow = last + 1;
}
int first = last - numEntriesToShow + 1;
if (first < history.first()) {
first = history.first();
}
return new HistoryIterator(first, last);
}
public class HistoryIterator implements Iterator<Entry<Integer, String>> {
private int index;
private int last;
HistoryIterator(int index, int last) {
this.index = index;
this.last = last;
}
@Override
public boolean hasNext() {
return index <= last;
}
@Override
public Entry<Integer, String> next() {
if (index > last) {
throw new NoSuchElementException();
}
int p = index++;
return new AbstractMap.SimpleImmutableEntry<>(p, history.get(p));
}
}
@Override
public void close() throws IOException {
terminal.close();
}
}

299
shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java

@ -0,0 +1,299 @@ @@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.apache.kafka.shell.MetadataNode.FileNode;
import org.jline.reader.Candidate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
/**
* Implements the ls command.
*/
public final class LsCommandHandler implements Commands.Handler {
private static final Logger log = LoggerFactory.getLogger(LsCommandHandler.class);
public final static Commands.Type TYPE = new LsCommandType();
public static class LsCommandType implements Commands.Type {
private LsCommandType() {
}
@Override
public String name() {
return "ls";
}
@Override
public String description() {
return "List metadata nodes.";
}
@Override
public boolean shellOnly() {
return false;
}
@Override
public void addArguments(ArgumentParser parser) {
parser.addArgument("targets").
nargs("*").
help("The metadata node paths to list.");
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
return new LsCommandHandler(namespace.getList("targets"));
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
candidates);
}
}
private final List<String> targets;
public LsCommandHandler(List<String> targets) {
this.targets = targets;
}
static class TargetDirectory {
private final String name;
private final List<String> children;
TargetDirectory(String name, List<String> children) {
this.name = name;
this.children = children;
}
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) throws Exception {
List<String> targetFiles = new ArrayList<>();
List<TargetDirectory> targetDirectories = new ArrayList<>();
for (String target : CommandUtils.getEffectivePaths(targets)) {
manager.visit(new GlobVisitor(target, entryOption -> {
if (entryOption.isPresent()) {
MetadataNodeInfo info = entryOption.get();
MetadataNode node = info.node();
if (node instanceof DirectoryNode) {
DirectoryNode directory = (DirectoryNode) node;
List<String> children = new ArrayList<>();
children.addAll(directory.children().keySet());
targetDirectories.add(
new TargetDirectory(info.lastPathComponent(), children));
} else if (node instanceof FileNode) {
targetFiles.add(info.lastPathComponent());
}
} else {
writer.println("ls: " + target + ": no such file or directory.");
}
}));
}
OptionalInt screenWidth = shell.isPresent() ?
OptionalInt.of(shell.get().screenWidth()) : OptionalInt.empty();
log.trace("LS : targetFiles = {}, targetDirectories = {}, screenWidth = {}",
targetFiles, targetDirectories, screenWidth);
printTargets(writer, screenWidth, targetFiles, targetDirectories);
}
static void printTargets(PrintWriter writer,
OptionalInt screenWidth,
List<String> targetFiles,
List<TargetDirectory> targetDirectories) {
printEntries(writer, "", screenWidth, targetFiles);
boolean needIntro = targetFiles.size() > 0 || targetDirectories.size() > 1;
boolean firstIntro = targetFiles.isEmpty();
for (TargetDirectory targetDirectory : targetDirectories) {
String intro = "";
if (needIntro) {
if (!firstIntro) {
intro = intro + String.format("%n");
}
intro = intro + targetDirectory.name + ":";
firstIntro = false;
}
log.trace("LS : targetDirectory name = {}, children = {}",
targetDirectory.name, targetDirectory.children);
printEntries(writer, intro, screenWidth, targetDirectory.children);
}
}
static void printEntries(PrintWriter writer,
String intro,
OptionalInt screenWidth,
List<String> entries) {
if (entries.isEmpty()) {
return;
}
if (!intro.isEmpty()) {
writer.println(intro);
}
ColumnSchema columnSchema = calculateColumnSchema(screenWidth, entries);
int numColumns = columnSchema.numColumns();
int numLines = (entries.size() + numColumns - 1) / numColumns;
for (int line = 0; line < numLines; line++) {
StringBuilder output = new StringBuilder();
for (int column = 0; column < numColumns; column++) {
int entryIndex = line + (column * columnSchema.entriesPerColumn());
if (entryIndex < entries.size()) {
String entry = entries.get(entryIndex);
output.append(entry);
if (column < numColumns - 1) {
int width = columnSchema.columnWidth(column);
for (int i = 0; i < width - entry.length(); i++) {
output.append(" ");
}
}
}
}
writer.println(output.toString());
}
}
static ColumnSchema calculateColumnSchema(OptionalInt screenWidth,
List<String> entries) {
if (!screenWidth.isPresent()) {
return new ColumnSchema(1, entries.size());
}
int maxColumns = screenWidth.getAsInt() / 4;
if (maxColumns <= 1) {
return new ColumnSchema(1, entries.size());
}
ColumnSchema[] schemas = new ColumnSchema[maxColumns];
for (int numColumns = 1; numColumns <= maxColumns; numColumns++) {
schemas[numColumns - 1] = new ColumnSchema(numColumns,
(entries.size() + numColumns - 1) / numColumns);
}
for (int i = 0; i < entries.size(); i++) {
String entry = entries.get(i);
for (int s = 0; s < schemas.length; s++) {
ColumnSchema schema = schemas[s];
schema.process(i, entry);
}
}
for (int s = schemas.length - 1; s > 0; s--) {
ColumnSchema schema = schemas[s];
if (schema.columnWidths[schema.columnWidths.length - 1] != 0 &&
schema.totalWidth() <= screenWidth.getAsInt()) {
return schema;
}
}
return schemas[0];
}
static class ColumnSchema {
private final int[] columnWidths;
private final int entriesPerColumn;
ColumnSchema(int numColumns, int entriesPerColumn) {
this.columnWidths = new int[numColumns];
this.entriesPerColumn = entriesPerColumn;
}
ColumnSchema setColumnWidths(Integer... widths) {
for (int i = 0; i < widths.length; i++) {
columnWidths[i] = widths[i];
}
return this;
}
void process(int entryIndex, String output) {
int columnIndex = entryIndex / entriesPerColumn;
columnWidths[columnIndex] = Math.max(
columnWidths[columnIndex], output.length() + 2);
}
int totalWidth() {
int total = 0;
for (int i = 0; i < columnWidths.length; i++) {
total += columnWidths[i];
}
return total;
}
int numColumns() {
return columnWidths.length;
}
int columnWidth(int columnIndex) {
return columnWidths[columnIndex];
}
int entriesPerColumn() {
return entriesPerColumn;
}
@Override
public int hashCode() {
return Objects.hash(columnWidths, entriesPerColumn);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof ColumnSchema)) return false;
ColumnSchema other = (ColumnSchema) o;
if (entriesPerColumn != other.entriesPerColumn) return false;
if (!Arrays.equals(columnWidths, other.columnWidths)) return false;
return true;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder("ColumnSchema(columnWidths=[");
String prefix = "";
for (int i = 0; i < columnWidths.length; i++) {
bld.append(prefix);
bld.append(columnWidths[i]);
prefix = ", ";
}
bld.append("], entriesPerColumn=").append(entriesPerColumn).append(")");
return bld.toString();
}
}
@Override
public int hashCode() {
return Objects.hashCode(targets);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof LsCommandHandler)) return false;
LsCommandHandler o = (LsCommandHandler) other;
if (!Objects.equals(o.targets, targets)) return false;
return true;
}
}

109
shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java

@ -0,0 +1,109 @@ @@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.jline.reader.Candidate;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
/**
* Implements the manual command.
*/
public final class ManCommandHandler implements Commands.Handler {
private final String cmd;
public final static Commands.Type TYPE = new ManCommandType();
public static class ManCommandType implements Commands.Type {
private ManCommandType() {
}
@Override
public String name() {
return "man";
}
@Override
public String description() {
return "Show the help text for a specific command.";
}
@Override
public boolean shellOnly() {
return true;
}
@Override
public void addArguments(ArgumentParser parser) {
parser.addArgument("cmd").
nargs(1).
help("The command to get help text for.");
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
return new ManCommandHandler(namespace.<String>getList("cmd").get(0));
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
if (nextWords.size() == 1) {
CommandUtils.completeCommand(nextWords.get(0), candidates);
}
}
}
public ManCommandHandler(String cmd) {
this.cmd = cmd;
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) {
Commands.Type type = Commands.TYPES.get(cmd);
if (type == null) {
writer.println("man: unknown command " + cmd +
". Type help to get a list of commands.");
} else {
ArgumentParser parser = ArgumentParsers.newArgumentParser(type.name(), false);
type.addArguments(parser);
writer.printf("%s: %s%n%n", cmd, type.description());
parser.printHelp(writer);
}
}
@Override
public int hashCode() {
return cmd.hashCode();
}
@Override
public boolean equals(Object other) {
if (!(other instanceof ManCommandHandler)) return false;
ManCommandHandler o = (ManCommandHandler) other;
if (!o.cmd.equals(cmd)) return false;
return true;
}
}

140
shell/src/main/java/org/apache/kafka/shell/MetadataNode.java

@ -0,0 +1,140 @@ @@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import java.util.NavigableMap;
import java.util.TreeMap;
/**
* A node in the metadata tool.
*/
public interface MetadataNode {
class DirectoryNode implements MetadataNode {
private final TreeMap<String, MetadataNode> children = new TreeMap<>();
public DirectoryNode mkdirs(String... names) {
if (names.length == 0) {
throw new RuntimeException("Invalid zero-length path");
}
DirectoryNode node = this;
for (int i = 0; i < names.length; i++) {
MetadataNode nextNode = node.children.get(names[i]);
if (nextNode == null) {
nextNode = new DirectoryNode();
node.children.put(names[i], nextNode);
} else {
if (!(nextNode instanceof DirectoryNode)) {
throw new NotDirectoryException();
}
}
node = (DirectoryNode) nextNode;
}
return node;
}
public void rmrf(String... names) {
if (names.length == 0) {
throw new RuntimeException("Invalid zero-length path");
}
DirectoryNode node = this;
for (int i = 0; i < names.length - 1; i++) {
MetadataNode nextNode = node.children.get(names[i]);
if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
throw new RuntimeException("Unable to locate directory /" +
String.join("/", names));
}
node = (DirectoryNode) nextNode;
}
node.children.remove(names[names.length - 1]);
}
public FileNode create(String name) {
MetadataNode node = children.get(name);
if (node == null) {
node = new FileNode();
children.put(name, node);
} else {
if (!(node instanceof FileNode)) {
throw new NotFileException();
}
}
return (FileNode) node;
}
public MetadataNode child(String component) {
return children.get(component);
}
public NavigableMap<String, MetadataNode> children() {
return children;
}
public void addChild(String name, DirectoryNode child) {
children.put(name, child);
}
public DirectoryNode directory(String... names) {
if (names.length == 0) {
throw new RuntimeException("Invalid zero-length path");
}
DirectoryNode node = this;
for (int i = 0; i < names.length; i++) {
MetadataNode nextNode = node.children.get(names[i]);
if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
throw new RuntimeException("Unable to locate directory /" +
String.join("/", names));
}
node = (DirectoryNode) nextNode;
}
return node;
}
public FileNode file(String... names) {
if (names.length == 0) {
throw new RuntimeException("Invalid zero-length path");
}
DirectoryNode node = this;
for (int i = 0; i < names.length - 1; i++) {
MetadataNode nextNode = node.children.get(names[i]);
if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
throw new RuntimeException("Unable to locate file /" +
String.join("/", names));
}
node = (DirectoryNode) nextNode;
}
MetadataNode nextNode = node.child(names[names.length - 1]);
if (nextNode == null || !(nextNode instanceof FileNode)) {
throw new RuntimeException("Unable to locate file /" +
String.join("/", names));
}
return (FileNode) nextNode;
}
}
class FileNode implements MetadataNode {
private String contents;
void setContents(String contents) {
this.contents = contents;
}
String contents() {
return contents;
}
}
}

302
shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java

@ -0,0 +1,302 @@ @@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.IsrChangeRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.apache.kafka.shell.MetadataNode.FileNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* Maintains the in-memory metadata for the metadata tool.
*/
public final class MetadataNodeManager implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
public static class Data {
private final DirectoryNode root = new DirectoryNode();
private String workingDirectory = "/";
public DirectoryNode root() {
return root;
}
public String workingDirectory() {
return workingDirectory;
}
public void setWorkingDirectory(String workingDirectory) {
this.workingDirectory = workingDirectory;
}
}
class LogListener implements MetaLogListener, RaftClient.Listener<ApiMessageAndVersion> {
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
try {
// TODO: handle lastOffset
while (reader.hasNext()) {
BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
handleMessage(messageAndVersion.message());
}
}
} finally {
reader.close();
}
}
@Override
public void handleCommits(long lastOffset, List<ApiMessage> messages) {
appendEvent("handleCommits", () -> {
log.error("handleCommits " + messages + " at offset " + lastOffset);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
dir.create("offset").setContents(String.valueOf(lastOffset));
for (ApiMessage message : messages) {
handleMessage(message);
}
}, null);
}
@Override
public void handleNewLeader(MetaLogLeader leader) {
appendEvent("handleNewLeader", () -> {
log.error("handleNewLeader " + leader);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
dir.create("leader").setContents(leader.toString());
}, null);
}
@Override
public void handleClaim(int epoch) {
// This shouldn't happen because we should never be the leader.
log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
}
@Override
public void handleRenounce(long epoch) {
// This shouldn't happen because we should never be the leader.
log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
}
@Override
public void beginShutdown() {
log.debug("MetaLogListener sent beginShutdown");
}
}
private final Data data = new Data();
private final LogListener logListener = new LogListener();
private final ObjectMapper objectMapper;
private final KafkaEventQueue queue;
public MetadataNodeManager() {
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new Jdk8Module());
this.queue = new KafkaEventQueue(Time.SYSTEM,
new LogContext("[node-manager-event-queue] "), "");
}
public void setup() throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
appendEvent("createShellNodes", () -> {
DirectoryNode directory = data.root().mkdirs("local");
directory.create("version").setContents(AppInfoParser.getVersion());
directory.create("commitId").setContents(AppInfoParser.getCommitId());
future.complete(null);
}, future);
future.get();
}
public LogListener logListener() {
return logListener;
}
@Override
public void close() throws Exception {
queue.close();
}
public void visit(Consumer<Data> consumer) throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
appendEvent("visit", () -> {
consumer.accept(data);
future.complete(null);
}, future);
future.get();
}
private void appendEvent(String name, Runnable runnable, CompletableFuture<?> future) {
queue.append(new EventQueue.Event() {
@Override
public void run() throws Exception {
runnable.run();
}
@Override
public void handleException(Throwable e) {
log.error("Unexpected error while handling event " + name, e);
if (future != null) {
future.completeExceptionally(e);
}
}
});
}
private void handleMessage(ApiMessage message) {
try {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
handleCommitImpl(type, message);
} catch (Exception e) {
log.error("Error processing record of type " + message.apiKey(), e);
}
}
private void handleCommitImpl(MetadataRecordType type, ApiMessage message)
throws Exception {
switch (type) {
case REGISTER_BROKER_RECORD: {
DirectoryNode brokersNode = data.root.mkdirs("brokers");
RegisterBrokerRecord record = (RegisterBrokerRecord) message;
DirectoryNode brokerNode = brokersNode.
mkdirs(Integer.toString(record.brokerId()));
FileNode registrationNode = brokerNode.create("registration");
registrationNode.setContents(record.toString());
brokerNode.create("isFenced").setContents("true");
break;
}
case UNREGISTER_BROKER_RECORD: {
UnregisterBrokerRecord record = (UnregisterBrokerRecord) message;
data.root.rmrf("brokers", Integer.toString(record.brokerId()));
break;
}
case TOPIC_RECORD: {
TopicRecord record = (TopicRecord) message;
DirectoryNode topicsDirectory = data.root.mkdirs("topics");
DirectoryNode topicDirectory = topicsDirectory.mkdirs(record.name());
topicDirectory.create("id").setContents(record.topicId().toString());
topicDirectory.create("name").setContents(record.name().toString());
DirectoryNode topicIdsDirectory = data.root.mkdirs("topicIds");
topicIdsDirectory.addChild(record.topicId().toString(), topicDirectory);
break;
}
case PARTITION_RECORD: {
PartitionRecord record = (PartitionRecord) message;
DirectoryNode topicDirectory =
data.root.mkdirs("topicIds").mkdirs(record.topicId().toString());
DirectoryNode partitionDirectory =
topicDirectory.mkdirs(Integer.toString(record.partitionId()));
JsonNode node = PartitionRecordJsonConverter.
write(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION);
partitionDirectory.create("data").setContents(node.toPrettyString());
break;
}
case CONFIG_RECORD: {
ConfigRecord record = (ConfigRecord) message;
String typeString = "";
switch (ConfigResource.Type.forId(record.resourceType())) {
case BROKER:
typeString = "broker";
break;
case TOPIC:
typeString = "topic";
break;
default:
throw new RuntimeException("Error processing CONFIG_RECORD: " +
"Can't handle ConfigResource.Type " + record.resourceType());
}
DirectoryNode configDirectory = data.root.mkdirs("configs").
mkdirs(typeString).mkdirs(record.resourceName());
if (record.value() == null) {
configDirectory.rmrf(record.name());
} else {
configDirectory.create(record.name()).setContents(record.value());
}
break;
}
case ISR_CHANGE_RECORD: {
IsrChangeRecord record = (IsrChangeRecord) message;
FileNode file = data.root.file("topicIds", record.topicId().toString(),
Integer.toString(record.partitionId()), "data");
JsonNode node = objectMapper.readTree(file.contents());
PartitionRecord partition = PartitionRecordJsonConverter.
read(node, PartitionRecord.HIGHEST_SUPPORTED_VERSION);
partition.setIsr(record.isr());
partition.setLeader(record.leader());
partition.setLeaderEpoch(record.leaderEpoch());
partition.setPartitionEpoch(record.partitionEpoch());
file.setContents(PartitionRecordJsonConverter.write(partition,
PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
break;
}
case FENCE_BROKER_RECORD: {
FenceBrokerRecord record = (FenceBrokerRecord) message;
data.root.mkdirs("brokers", Integer.toString(record.id())).
create("isFenced").setContents("true");
break;
}
case UNFENCE_BROKER_RECORD: {
UnfenceBrokerRecord record = (UnfenceBrokerRecord) message;
data.root.mkdirs("brokers", Integer.toString(record.id())).
create("isFenced").setContents("false");
break;
}
case REMOVE_TOPIC_RECORD: {
RemoveTopicRecord record = (RemoveTopicRecord) message;
DirectoryNode topicsDirectory =
data.root.directory("topicIds", record.topicId().toString());
String name = topicsDirectory.file("name").contents();
data.root.rmrf("topics", name);
data.root.rmrf("topicIds", record.topicId().toString());
break;
}
default:
throw new RuntimeException("Unhandled metadata record type");
}
}
}

174
shell/src/main/java/org/apache/kafka/shell/MetadataShell.java

@ -0,0 +1,174 @@ @@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import kafka.raft.KafkaRaftManager;
import kafka.tools.TerseFailure;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
* The Kafka metadata shell.
*/
public final class MetadataShell {
private static final Logger log = LoggerFactory.getLogger(MetadataShell.class);
public static class Builder {
private String snapshotPath;
public Builder setSnapshotPath(String snapshotPath) {
this.snapshotPath = snapshotPath;
return this;
}
public MetadataShell build() throws Exception {
MetadataNodeManager nodeManager = null;
SnapshotFileReader reader = null;
try {
nodeManager = new MetadataNodeManager();
reader = new SnapshotFileReader(snapshotPath, nodeManager.logListener());
return new MetadataShell(null, reader, nodeManager);
} catch (Throwable e) {
log.error("Initialization error", e);
if (reader != null) {
reader.close();
}
if (nodeManager != null) {
nodeManager.close();
}
throw e;
}
}
}
private final KafkaRaftManager<ApiMessageAndVersion> raftManager;
private final SnapshotFileReader snapshotFileReader;
private final MetadataNodeManager nodeManager;
public MetadataShell(KafkaRaftManager<ApiMessageAndVersion> raftManager,
SnapshotFileReader snapshotFileReader,
MetadataNodeManager nodeManager) {
this.raftManager = raftManager;
this.snapshotFileReader = snapshotFileReader;
this.nodeManager = nodeManager;
}
public void run(List<String> args) throws Exception {
nodeManager.setup();
if (raftManager != null) {
raftManager.startup();
raftManager.register(nodeManager.logListener());
} else if (snapshotFileReader != null) {
snapshotFileReader.startup();
} else {
throw new RuntimeException("Expected either a raft manager or snapshot reader");
}
if (args == null || args.isEmpty()) {
// Interactive mode.
try (InteractiveShell shell = new InteractiveShell(nodeManager)) {
shell.runMainLoop();
}
} else {
// Non-interactive mode.
Commands commands = new Commands(false);
try (PrintWriter writer = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(System.out, StandardCharsets.UTF_8)))) {
Commands.Handler handler = commands.parseCommand(args);
handler.run(Optional.empty(), writer, nodeManager);
writer.flush();
}
}
}
public void close() throws Exception {
if (raftManager != null) {
raftManager.shutdown();
}
if (snapshotFileReader != null) {
snapshotFileReader.close();
}
nodeManager.close();
}
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("metadata-tool")
.defaultHelp(true)
.description("The Apache Kafka metadata tool");
parser.addArgument("--snapshot", "-s")
.type(String.class)
.help("The snapshot file to read.");
parser.addArgument("command")
.nargs("*")
.help("The command to run.");
Namespace res = parser.parseArgsOrFail(args);
try {
Builder builder = new Builder();
builder.setSnapshotPath(res.getString("snapshot"));
Path tempDir = Files.createTempDirectory("MetadataShell");
Exit.addShutdownHook("agent-shutdown-hook", () -> {
log.debug("Removing temporary directory " + tempDir.toAbsolutePath().toString());
try {
Utils.delete(tempDir.toFile());
} catch (Exception e) {
log.error("Got exception while removing temporary directory " +
tempDir.toAbsolutePath().toString());
}
});
MetadataShell shell = builder.build();
shell.waitUntilCaughtUp();
try {
shell.run(res.getList("command"));
} finally {
shell.close();
}
Exit.exit(0);
} catch (TerseFailure e) {
System.err.println("Error: " + e.getMessage());
Exit.exit(1);
} catch (Throwable e) {
System.err.println("Unexpected error: " +
(e.getMessage() == null ? "" : e.getMessage()));
e.printStackTrace(System.err);
Exit.exit(1);
}
}
void waitUntilCaughtUp() throws ExecutionException, InterruptedException {
snapshotFileReader.caughtUpFuture().get();
}
}

43
shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java

@ -0,0 +1,43 @@ @@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import java.io.PrintWriter;
import java.util.Optional;
/**
* Does nothing.
*/
public final class NoOpCommandHandler implements Commands.Handler {
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) {
}
@Override
public int hashCode() {
return 0;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof NoOpCommandHandler)) return false;
return true;
}
}

30
shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java

@ -0,0 +1,30 @@ @@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
/**
* An exception that is thrown when a non-directory node is treated like a
* directory.
*/
public class NotDirectoryException extends RuntimeException {
private static final long serialVersionUID = 1L;
public NotDirectoryException() {
super();
}
}

30
shell/src/main/java/org/apache/kafka/shell/NotFileException.java

@ -0,0 +1,30 @@ @@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
/**
* An exception that is thrown when a non-file node is treated like a
* file.
*/
public class NotFileException extends RuntimeException {
private static final long serialVersionUID = 1L;
public NotFileException() {
super();
}
}

89
shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java

@ -0,0 +1,89 @@ @@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.jline.reader.Candidate;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
/**
* Implements the pwd command.
*/
public final class PwdCommandHandler implements Commands.Handler {
public final static Commands.Type TYPE = new PwdCommandType();
public static class PwdCommandType implements Commands.Type {
private PwdCommandType() {
}
@Override
public String name() {
return "pwd";
}
@Override
public String description() {
return "Print the current working directory.";
}
@Override
public boolean shellOnly() {
return true;
}
@Override
public void addArguments(ArgumentParser parser) {
// nothing to do
}
@Override
public Commands.Handler createHandler(Namespace namespace) {
return new PwdCommandHandler();
}
@Override
public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
List<Candidate> candidates) throws Exception {
// nothing to do
}
}
@Override
public void run(Optional<InteractiveShell> shell,
PrintWriter writer,
MetadataNodeManager manager) throws Exception {
manager.visit(data -> {
writer.println(data.workingDirectory());
});
}
@Override
public int hashCode() {
return 0;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof PwdCommandHandler)) return false;
return true;
}
}

194
shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java

@ -0,0 +1,194 @@ @@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Reads Kafka metadata snapshots.
*/
public final class SnapshotFileReader implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(SnapshotFileReader.class);
private final String snapshotPath;
private final MetaLogListener listener;
private final KafkaEventQueue queue;
private final CompletableFuture<Void> caughtUpFuture;
private FileRecords fileRecords;
private Iterator<FileChannelRecordBatch> batchIterator;
public SnapshotFileReader(String snapshotPath, MetaLogListener listener) {
this.snapshotPath = snapshotPath;
this.listener = listener;
this.queue = new KafkaEventQueue(Time.SYSTEM,
new LogContext("[snapshotReaderQueue] "), "snapshotReaderQueue_");
this.caughtUpFuture = new CompletableFuture<>();
}
public void startup() throws Exception {
CompletableFuture<Void> future = new CompletableFuture<>();
queue.append(new EventQueue.Event() {
@Override
public void run() throws Exception {
fileRecords = FileRecords.open(new File(snapshotPath), false);
batchIterator = fileRecords.batches().iterator();
scheduleHandleNextBatch();
future.complete(null);
}
@Override
public void handleException(Throwable e) {
future.completeExceptionally(e);
beginShutdown("startup error");
}
});
future.get();
}
private void handleNextBatch() {
if (!batchIterator.hasNext()) {
beginShutdown("done");
return;
}
FileChannelRecordBatch batch = batchIterator.next();
if (batch.isControlBatch()) {
handleControlBatch(batch);
} else {
handleMetadataBatch(batch);
}
scheduleHandleNextBatch();
}
private void scheduleHandleNextBatch() {
queue.append(new EventQueue.Event() {
@Override
public void run() throws Exception {
handleNextBatch();
}
@Override
public void handleException(Throwable e) {
log.error("Unexpected error while handling a batch of events", e);
beginShutdown("handleBatch error");
}
});
}
private void handleControlBatch(FileChannelRecordBatch batch) {
for (Iterator<Record> iter = batch.iterator(); iter.hasNext(); ) {
Record record = iter.next();
try {
short typeId = ControlRecordType.parseTypeId(record.key());
ControlRecordType type = ControlRecordType.fromTypeId(typeId);
switch (type) {
case LEADER_CHANGE:
LeaderChangeMessage message = new LeaderChangeMessage();
message.read(new ByteBufferAccessor(record.value()), (short) 0);
listener.handleNewLeader(new MetaLogLeader(message.leaderId(),
batch.partitionLeaderEpoch()));
break;
default:
log.error("Ignoring control record with type {} at offset {}",
type, record.offset());
}
} catch (Throwable e) {
log.error("unable to read control record at offset {}", record.offset(), e);
}
}
}
private void handleMetadataBatch(FileChannelRecordBatch batch) {
List<ApiMessage> messages = new ArrayList<>();
for (Iterator<Record> iter = batch.iterator(); iter.hasNext(); ) {
Record record = iter.next();
ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
try {
int apiKey = accessor.readUnsignedVarint();
if (apiKey > Short.MAX_VALUE || apiKey < 0) {
throw new RuntimeException("Invalid apiKey value " + apiKey);
}
int apiVersion = accessor.readUnsignedVarint();
if (apiVersion > Short.MAX_VALUE || apiVersion < 0) {
throw new RuntimeException("Invalid apiVersion value " + apiVersion);
}
ApiMessage message = MetadataRecordType.fromId((short) apiKey).newMetadataRecord();
message.read(accessor, (short) apiVersion);
messages.add(message);
} catch (Throwable e) {
log.error("unable to read metadata record at offset {}", record.offset(), e);
}
}
listener.handleCommits(batch.lastOffset(), messages);
}
public void beginShutdown(String reason) {
if (reason.equals("done")) {
caughtUpFuture.complete(null);
} else {
caughtUpFuture.completeExceptionally(new RuntimeException(reason));
}
queue.beginShutdown(reason, new EventQueue.Event() {
@Override
public void run() throws Exception {
listener.beginShutdown();
if (fileRecords != null) {
fileRecords.close();
fileRecords = null;
}
batchIterator = null;
}
@Override
public void handleException(Throwable e) {
log.error("shutdown error", e);
}
});
}
@Override
public void close() throws Exception {
beginShutdown("closing");
queue.close();
}
public CompletableFuture<Void> caughtUpFuture() {
return caughtUpFuture;
}
}

70
shell/src/test/java/org/apache/kafka/shell/CommandTest.java

@ -0,0 +1,70 @@ @@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
@Timeout(value = 120000, unit = MILLISECONDS)
public class CommandTest {
@Test
public void testParseCommands() {
assertEquals(new CatCommandHandler(Arrays.asList("foo")),
new Commands(true).parseCommand(Arrays.asList("cat", "foo")));
assertEquals(new CdCommandHandler(Optional.empty()),
new Commands(true).parseCommand(Arrays.asList("cd")));
assertEquals(new CdCommandHandler(Optional.of("foo")),
new Commands(true).parseCommand(Arrays.asList("cd", "foo")));
assertEquals(new ExitCommandHandler(),
new Commands(true).parseCommand(Arrays.asList("exit")));
assertEquals(new HelpCommandHandler(),
new Commands(true).parseCommand(Arrays.asList("help")));
assertEquals(new HistoryCommandHandler(3),
new Commands(true).parseCommand(Arrays.asList("history", "3")));
assertEquals(new HistoryCommandHandler(Integer.MAX_VALUE),
new Commands(true).parseCommand(Arrays.asList("history")));
assertEquals(new LsCommandHandler(Collections.emptyList()),
new Commands(true).parseCommand(Arrays.asList("ls")));
assertEquals(new LsCommandHandler(Arrays.asList("abc", "123")),
new Commands(true).parseCommand(Arrays.asList("ls", "abc", "123")));
assertEquals(new PwdCommandHandler(),
new Commands(true).parseCommand(Arrays.asList("pwd")));
}
@Test
public void testParseInvalidCommand() {
assertEquals(new ErroneousCommandHandler("invalid choice: 'blah' (choose " +
"from 'cat', 'cd', 'exit', 'find', 'help', 'history', 'ls', 'man', 'pwd')"),
new Commands(true).parseCommand(Arrays.asList("blah")));
}
@Test
public void testEmptyCommandLine() {
assertEquals(new NoOpCommandHandler(),
new Commands(true).parseCommand(Arrays.asList("")));
assertEquals(new NoOpCommandHandler(),
new Commands(true).parseCommand(Collections.emptyList()));
}
}

37
shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
@Timeout(value = 120000, unit = MILLISECONDS)
public class CommandUtilsTest {
@Test
public void testSplitPath() {
assertEquals(Arrays.asList("alpha", "beta"),
CommandUtils.splitPath("/alpha/beta"));
assertEquals(Arrays.asList("alpha", "beta"),
CommandUtils.splitPath("//alpha/beta/"));
}
}

75
shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@Timeout(value = 120000, unit = MILLISECONDS)
public class GlobComponentTest {
private void verifyIsLiteral(GlobComponent globComponent, String component) {
assertTrue(globComponent.literal());
assertEquals(component, globComponent.component());
assertTrue(globComponent.matches(component));
assertFalse(globComponent.matches(component + "foo"));
}
@Test
public void testLiteralComponent() {
verifyIsLiteral(new GlobComponent("abc"), "abc");
verifyIsLiteral(new GlobComponent(""), "");
verifyIsLiteral(new GlobComponent("foobar_123"), "foobar_123");
verifyIsLiteral(new GlobComponent("$blah+"), "$blah+");
}
@Test
public void testToRegularExpression() {
assertEquals(null, GlobComponent.toRegularExpression("blah"));
assertEquals(null, GlobComponent.toRegularExpression(""));
assertEquals(null, GlobComponent.toRegularExpression("does not need a regex, actually"));
assertEquals("^\\$blah.*$", GlobComponent.toRegularExpression("$blah*"));
assertEquals("^.*$", GlobComponent.toRegularExpression("*"));
assertEquals("^foo(?:(?:bar)|(?:baz))$", GlobComponent.toRegularExpression("foo{bar,baz}"));
}
@Test
public void testGlobMatch() {
GlobComponent star = new GlobComponent("*");
assertFalse(star.literal());
assertTrue(star.matches(""));
assertTrue(star.matches("anything"));
GlobComponent question = new GlobComponent("b?b");
assertFalse(question.literal());
assertFalse(question.matches(""));
assertTrue(question.matches("bob"));
assertTrue(question.matches("bib"));
assertFalse(question.matches("bic"));
GlobComponent foobarOrFoobaz = new GlobComponent("foo{bar,baz}");
assertFalse(foobarOrFoobaz.literal());
assertTrue(foobarOrFoobaz.matches("foobar"));
assertTrue(foobarOrFoobaz.matches("foobaz"));
assertFalse(foobarOrFoobaz.matches("foobah"));
assertFalse(foobarOrFoobaz.matches("foo"));
assertFalse(foobarOrFoobaz.matches("baz"));
}
}

144
shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java

@ -0,0 +1,144 @@ @@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
@Timeout(value = 120000, unit = MILLISECONDS)
public class GlobVisitorTest {
static private final MetadataNodeManager.Data DATA;
static {
DATA = new MetadataNodeManager.Data();
DATA.root().mkdirs("alpha", "beta", "gamma");
DATA.root().mkdirs("alpha", "theta");
DATA.root().mkdirs("foo", "a");
DATA.root().mkdirs("foo", "beta");
DATA.root().mkdirs("zeta").create("c");
DATA.root().mkdirs("zeta");
DATA.root().create("zzz");
DATA.setWorkingDirectory("foo");
}
static class InfoConsumer implements Consumer<Optional<MetadataNodeInfo>> {
private Optional<List<MetadataNodeInfo>> infos = null;
@Override
public void accept(Optional<MetadataNodeInfo> info) {
if (infos == null) {
if (info.isPresent()) {
infos = Optional.of(new ArrayList<>());
infos.get().add(info.get());
} else {
infos = Optional.empty();
}
} else {
if (info.isPresent()) {
infos.get().add(info.get());
} else {
throw new RuntimeException("Saw non-empty info after seeing empty info");
}
}
}
}
@Test
public void testStarGlob() {
InfoConsumer consumer = new InfoConsumer();
GlobVisitor visitor = new GlobVisitor("*", consumer);
visitor.accept(DATA);
assertEquals(Optional.of(Arrays.asList(
new MetadataNodeInfo(new String[] {"foo", "a"},
DATA.root().directory("foo").child("a")),
new MetadataNodeInfo(new String[] {"foo", "beta"},
DATA.root().directory("foo").child("beta")))), consumer.infos);
}
@Test
public void testDotDot() {
InfoConsumer consumer = new InfoConsumer();
GlobVisitor visitor = new GlobVisitor("..", consumer);
visitor.accept(DATA);
assertEquals(Optional.of(Arrays.asList(
new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos);
}
@Test
public void testDoubleDotDot() {
InfoConsumer consumer = new InfoConsumer();
GlobVisitor visitor = new GlobVisitor("../..", consumer);
visitor.accept(DATA);
assertEquals(Optional.of(Arrays.asList(
new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos);
}
@Test
public void testZGlob() {
InfoConsumer consumer = new InfoConsumer();
GlobVisitor visitor = new GlobVisitor("../z*", consumer);
visitor.accept(DATA);
assertEquals(Optional.of(Arrays.asList(
new MetadataNodeInfo(new String[] {"zeta"},
DATA.root().child("zeta")),
new MetadataNodeInfo(new String[] {"zzz"},
DATA.root().child("zzz")))), consumer.infos);
}
@Test
public void testBetaOrThetaGlob() {
InfoConsumer consumer = new InfoConsumer();
GlobVisitor visitor = new GlobVisitor("../*/{beta,theta}", consumer);
visitor.accept(DATA);
assertEquals(Optional.of(Arrays.asList(
new MetadataNodeInfo(new String[] {"alpha", "beta"},
DATA.root().directory("alpha").child("beta")),
new MetadataNodeInfo(new String[] {"alpha", "theta"},
DATA.root().directory("alpha").child("theta")),
new MetadataNodeInfo(new String[] {"foo", "beta"},
DATA.root().directory("foo").child("beta")))), consumer.infos);
}
@Test
public void testNotFoundGlob() {
InfoConsumer consumer = new InfoConsumer();
GlobVisitor visitor = new GlobVisitor("epsilon", consumer);
visitor.accept(DATA);
assertEquals(Optional.empty(), consumer.infos);
}
@Test
public void testAbsoluteGlob() {
InfoConsumer consumer = new InfoConsumer();
GlobVisitor visitor = new GlobVisitor("/a?pha", consumer);
visitor.accept(DATA);
assertEquals(Optional.of(Arrays.asList(
new MetadataNodeInfo(new String[] {"alpha"},
DATA.root().directory("alpha")))), consumer.infos);
}
}

99
shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java

@ -0,0 +1,99 @@ @@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.kafka.shell.LsCommandHandler.ColumnSchema;
import org.apache.kafka.shell.LsCommandHandler.TargetDirectory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.OptionalInt;
@Timeout(value = 120000, unit = MILLISECONDS)
public class LsCommandHandlerTest {
@Test
public void testCalculateColumnSchema() {
assertEquals(new ColumnSchema(1, 3),
LsCommandHandler.calculateColumnSchema(OptionalInt.empty(),
Arrays.asList("abc", "def", "ghi")));
assertEquals(new ColumnSchema(1, 2),
LsCommandHandler.calculateColumnSchema(OptionalInt.of(0),
Arrays.asList("abc", "def")));
assertEquals(new ColumnSchema(3, 1).setColumnWidths(3, 8, 6),
LsCommandHandler.calculateColumnSchema(OptionalInt.of(80),
Arrays.asList("a", "abcdef", "beta")));
assertEquals(new ColumnSchema(2, 3).setColumnWidths(10, 7),
LsCommandHandler.calculateColumnSchema(OptionalInt.of(18),
Arrays.asList("alphabet", "beta", "gamma", "theta", "zeta")));
}
@Test
public void testPrintEntries() throws Exception {
try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
try (PrintWriter writer = new PrintWriter(new OutputStreamWriter(
stream, StandardCharsets.UTF_8))) {
LsCommandHandler.printEntries(writer, "", OptionalInt.of(18),
Arrays.asList("alphabet", "beta", "gamma", "theta", "zeta"));
}
assertEquals(String.join(String.format("%n"), Arrays.asList(
"alphabet theta",
"beta zeta",
"gamma")), stream.toString().trim());
}
}
@Test
public void testPrintTargets() throws Exception {
try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
try (PrintWriter writer = new PrintWriter(new OutputStreamWriter(
stream, StandardCharsets.UTF_8))) {
LsCommandHandler.printTargets(writer, OptionalInt.of(18),
Arrays.asList("foo", "foobarbaz", "quux"), Arrays.asList(
new TargetDirectory("/some/dir",
Collections.singletonList("supercalifragalistic")),
new TargetDirectory("/some/other/dir",
Arrays.asList("capability", "delegation", "elephant",
"fungible", "green"))));
}
assertEquals(String.join(String.format("%n"), Arrays.asList(
"foo quux",
"foobarbaz ",
"",
"/some/dir:",
"supercalifragalistic",
"",
"/some/other/dir:",
"capability",
"delegation",
"elephant",
"fungible",
"green")), stream.toString().trim());
}
}
}

73
shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java

@ -0,0 +1,73 @@ @@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.shell;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.apache.kafka.shell.MetadataNode.FileNode;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@Timeout(value = 120000, unit = MILLISECONDS)
public class MetadataNodeTest {
@Test
public void testMkdirs() {
DirectoryNode root = new DirectoryNode();
DirectoryNode defNode = root.mkdirs("abc", "def");
DirectoryNode defNode2 = root.mkdirs("abc", "def");
assertTrue(defNode == defNode2);
DirectoryNode defNode3 = root.directory("abc", "def");
assertTrue(defNode == defNode3);
root.mkdirs("ghi");
assertEquals(new HashSet<>(Arrays.asList("abc", "ghi")), root.children().keySet());
assertEquals(Collections.singleton("def"), root.mkdirs("abc").children().keySet());
assertEquals(Collections.emptySet(), defNode.children().keySet());
}
@Test
public void testRmrf() {
DirectoryNode root = new DirectoryNode();
DirectoryNode foo = root.mkdirs("foo");
foo.mkdirs("a");
foo.mkdirs("b");
root.mkdirs("baz");
assertEquals(new HashSet<>(Arrays.asList("foo", "baz")), root.children().keySet());
root.rmrf("foo", "a");
assertEquals(new HashSet<>(Arrays.asList("b")), foo.children().keySet());
root.rmrf("foo");
assertEquals(new HashSet<>(Collections.singleton("baz")), root.children().keySet());
}
@Test
public void testCreateFiles() {
DirectoryNode root = new DirectoryNode();
DirectoryNode abcdNode = root.mkdirs("abcd");
FileNode quuxNodde = abcdNode.create("quux");
quuxNodde.setContents("quux contents");
assertEquals("quux contents", quuxNodde.contents());
assertThrows(NotDirectoryException.class, () -> root.mkdirs("abcd", "quux"));
}
}
Loading…
Cancel
Save