diff --git a/bin/kafka-streams-application-reset.sh b/bin/kafka-streams-application-reset.sh index 33637325400..26ab7667137 100755 --- a/bin/kafka-streams-application-reset.sh +++ b/bin/kafka-streams-application-reset.sh @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.tools.StreamsResetter "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.StreamsResetter "$@" diff --git a/bin/windows/kafka-streams-application-reset.bat b/bin/windows/kafka-streams-application-reset.bat index 1cfb6f518c8..77ffc7d599e 100644 --- a/bin/windows/kafka-streams-application-reset.bat +++ b/bin/windows/kafka-streams-application-reset.bat @@ -19,5 +19,5 @@ IF ["%KAFKA_HEAP_OPTS%"] EQU [""] ( set KAFKA_HEAP_OPTS=-Xmx512M ) -"%~dp0kafka-run-class.bat" kafka.tools.StreamsResetter %* +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.StreamsResetter %* EndLocal diff --git a/build.gradle b/build.gradle index 9b50246ef76..32c95786d9d 100644 --- a/build.gradle +++ b/build.gradle @@ -1934,6 +1934,7 @@ project(':streams') { testImplementation project(':clients').sourceSets.test.output testImplementation project(':core') + testImplementation project(':tools') testImplementation project(':core').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output testImplementation libs.log4j diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b7a55201106..6dbeb3259a5 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -425,6 +425,8 @@ + + @@ -483,6 +485,7 @@ + @@ -490,7 +493,7 @@ - + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index eee8772ca42..72af7db2e8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -611,7 +611,7 @@ public class InternalTopicManager { if (!existedTopicPartition.get(topicName).equals(numberOfPartitions.get())) { final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " + "expected: %d; actual: %d. " + - "Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", + "Use 'org.apache.kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", topicName, numberOfPartitions.get(), existedTopicPartition.get(topicName)); log.error(errorMsg); throw new StreamsException(errorMsg); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 9a923e318ae..dd1348c6351 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.integration; -import kafka.tools.StreamsResetter; +import org.apache.kafka.tools.StreamsResetter; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.ConsumerConfig; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index daeea07bfcc..235cb4766df 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.integration; import kafka.server.KafkaConfig$; -import kafka.tools.StreamsResetter; +import org.apache.kafka.tools.StreamsResetter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.KafkaStreams; diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index dc3cf65b2cf..e0c3204ae23 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.tools; -import kafka.tools.StreamsResetter; +import org.apache.kafka.tools.StreamsResetter; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 65b3634b805..9e322b9d04c 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -522,7 +522,7 @@ class StreamsResetter(StreamsTestBaseService): def __init__(self, test_context, kafka, topic, applicationId): super(StreamsResetter, self).__init__(test_context, kafka, - "kafka.tools.StreamsResetter", + "org.apache.kafka.tools.StreamsResetter", "") self.topic = topic self.applicationId = applicationId diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java similarity index 99% rename from core/src/main/scala/kafka/tools/StreamsResetter.java rename to tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java index 1b52b2d2ec0..b50de831de4 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.tools; +package org.apache.kafka.tools; import joptsimple.OptionException; import joptsimple.OptionParser;