From 2e662a061608a0755274da3e2ccf87401936febf Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 8 Feb 2017 00:08:07 -0800 Subject: [PATCH] KAFKA-4733: Improve Streams Reset Tool console output Added general explanation of the tool and what it does. Also added few details to the arguments. Author: Gwen Shapira Reviewers: Matthias J. Sax, Michael G. Noll, Guozhang Wang Closes #2503 from gwenshap/KAFKA-4733 --- .../scala/kafka/tools/StreamsResetter.java | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 0c9f26e019c..a79cd39baef 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -138,12 +138,12 @@ public class StreamsResetter { .ofType(String.class) .defaultsTo("localhost:2181") .describedAs("url"); - inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics") + inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.") .withRequiredArg() .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); - intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics") + intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method). For these topics, the tool will skip to the end.") .withRequiredArg() .ofType(String.class) .withValuesSeparatedBy(',') @@ -152,7 +152,7 @@ public class StreamsResetter { try { options = optionParser.parse(args); } catch (final OptionException e) { - optionParser.printHelpOn(System.err); + printHelp(optionParser); throw e; } } @@ -271,6 +271,26 @@ public class StreamsResetter { && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")); } + private void printHelp(OptionParser parser) throws IOException { + System.err.println("The Application Reset Tool allows you to quickly reset an application in order to reprocess " + + "its data from scratch.\n" + + "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of " + + "intermediate topics (topics used in the through() method).\n" + + "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with " + + "\"-\").\n" + + "You do not need to specify internal topics because the tool finds them automatically.\n" + + "* This tool will not delete output topics (if you want to delete them, you need to do it yourself " + + "with the bin/kafka-topics.sh command).\n" + + "* This tool will not clean up the local state on the stream application instances (the persisted " + + "stores used to cache aggregation results).\n" + + "You need to call KafkaStreams#cleanUp() in your application or manually delete them from the " + + "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/ by default).\n\n" + + "*** Important! You will get wrong output if you don't clean up the local stores after running the " + + "reset tool!\n\n" + ); + parser.printHelpOn(System.err); + } + public static void main(final String[] args) { Exit.exit(new StreamsResetter().run(args)); }