Browse Source

KAFKA-4733: Improve Streams Reset Tool console output

Added general explanation of the tool and what it does. Also added few details to the arguments.

Author: Gwen Shapira <cshapi@gmail.com>

Reviewers: Matthias J. Sax, Michael G. Noll, Guozhang Wang

Closes #2503 from gwenshap/KAFKA-4733
pull/2509/merge
Gwen Shapira 8 years ago committed by Guozhang Wang
parent
commit
2e662a0616
  1. 26
      core/src/main/scala/kafka/tools/StreamsResetter.java

26
core/src/main/scala/kafka/tools/StreamsResetter.java

@ -138,12 +138,12 @@ public class StreamsResetter { @@ -138,12 +138,12 @@ public class StreamsResetter {
.ofType(String.class)
.defaultsTo("localhost:2181")
.describedAs("url");
inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics")
inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics")
intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method). For these topics, the tool will skip to the end.")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
@ -152,7 +152,7 @@ public class StreamsResetter { @@ -152,7 +152,7 @@ public class StreamsResetter {
try {
options = optionParser.parse(args);
} catch (final OptionException e) {
optionParser.printHelpOn(System.err);
printHelp(optionParser);
throw e;
}
}
@ -271,6 +271,26 @@ public class StreamsResetter { @@ -271,6 +271,26 @@ public class StreamsResetter {
&& (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
}
private void printHelp(OptionParser parser) throws IOException {
System.err.println("The Application Reset Tool allows you to quickly reset an application in order to reprocess "
+ "its data from scratch.\n"
+ "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of "
+ "intermediate topics (topics used in the through() method).\n"
+ "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with "
+ "\"<application.id>-\").\n"
+ "You do not need to specify internal topics because the tool finds them automatically.\n"
+ "* This tool will not delete output topics (if you want to delete them, you need to do it yourself "
+ "with the bin/kafka-topics.sh command).\n"
+ "* This tool will not clean up the local state on the stream application instances (the persisted "
+ "stores used to cache aggregation results).\n"
+ "You need to call KafkaStreams#cleanUp() in your application or manually delete them from the "
+ "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id> by default).\n\n"
+ "*** Important! You will get wrong output if you don't clean up the local stores after running the "
+ "reset tool!\n\n"
);
parser.printHelpOn(System.err);
}
public static void main(final String[] args) {
Exit.exit(new StreamsResetter().run(args));
}

Loading…
Cancel
Save