diff --git a/build.gradle b/build.gradle index 32c95786d9d..a4ea31ceed5 100644 --- a/build.gradle +++ b/build.gradle @@ -858,6 +858,7 @@ project(':core') { implementation project(':group-coordinator') implementation project(':metadata') implementation project(':storage:api') + implementation project(':tools:tools-api') implementation project(':raft') implementation project(':storage') @@ -1106,6 +1107,8 @@ project(':core') { from(project(':streams:test-utils').configurations.runtimeClasspath) { into("libs/") } from(project(':streams:examples').jar) { into("libs/") } from(project(':streams:examples').configurations.runtimeClasspath) { into("libs/") } + from(project(':tools:tools-api').jar) { into("libs/") } + from(project(':tools:tools-api').configurations.runtimeClasspath) { into("libs/") } duplicatesStrategy 'exclude' } @@ -1766,6 +1769,61 @@ project(':storage') { } } +project(':tools:tools-api') { + archivesBaseName = "kafka-tools-api" + + dependencies { + implementation project(':clients') + testImplementation libs.junitJupiter + } + + task createVersionFile() { + def receiptFile = file("$buildDir/kafka/$buildVersionFileName") + inputs.property "commitId", commitId + inputs.property "version", version + outputs.file receiptFile + + doLast { + def data = [ + commitId: commitId, + version: version, + ] + + receiptFile.parentFile.mkdirs() + def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n") + receiptFile.setText(content, "ISO-8859-1") + } + } + + sourceSets { + main { + java { + srcDirs = ["src/main/java"] + } + } + test { + java { + srcDirs = ["src/test/java"] + } + } + } + + jar { + dependsOn createVersionFile + from("$buildDir") { + include "kafka/$buildVersionFileName" + } + } + + clean.doFirst { + delete "$buildDir/kafka/" + } + + javadoc { + include "**/org/apache/kafka/tools/api/*" + } +} + project(':tools') { archivesBaseName = "kafka-tools" @@ -1773,6 +1831,7 @@ project(':tools') { implementation project(':clients') implementation project(':server-common') implementation project(':log4j-appender') + implementation project(':tools:tools-api') implementation libs.argparse4j implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes diff --git a/core/src/main/scala/kafka/common/MessageReader.scala b/core/src/main/scala/kafka/common/MessageReader.scala index 304e15f3204..37c17c6c916 100644 --- a/core/src/main/scala/kafka/common/MessageReader.scala +++ b/core/src/main/scala/kafka/common/MessageReader.scala @@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord * This is used by the `ConsoleProducer`. */ @deprecated("This class has been deprecated and will be removed in 4.0." + - "Please use org.apache.kafka.tools.RecordReader instead", "3.5.0") + "Please use org.apache.kafka.tools.api.RecordReader instead", "3.5.0") trait MessageReader { def init(inputStream: InputStream, props: Properties): Unit = {} diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index a11905fdf96..37a8c807c80 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.KafkaException import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} -import org.apache.kafka.tools.RecordReader +import org.apache.kafka.tools.api.RecordReader import scala.annotation.nowarn @@ -45,7 +45,7 @@ object ConsoleProducer extends Logging { r.configure(prop.asInstanceOf[java.util.Map[String, _]]) r case r: MessageReader => - logger.warn("MessageReader is deprecated. Please use org.apache.kafka.tools.RecordReader instead") + logger.warn("MessageReader is deprecated. Please use org.apache.kafka.tools.api.RecordReader instead") new RecordReader { private[this] var initialized = false diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index 5e0a941f855..9ef8f0a874f 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala @@ -23,7 +23,7 @@ import java.nio.file.Files import kafka.tools.ConsoleProducer.LineMessageReader import kafka.utils.{Exit, TestUtils} import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord} -import org.apache.kafka.tools.RecordReader +import org.apache.kafka.tools.api.RecordReader import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test import org.mockito.Mockito diff --git a/settings.gradle b/settings.gradle index f0dceb845ba..871c57a887c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -58,6 +58,7 @@ include 'clients', 'streams:upgrade-system-tests-32', 'streams:upgrade-system-tests-33', 'tools', + 'tools:tools-api', 'trogdor' rootProject.name = 'kafka' diff --git a/clients/src/main/java/org/apache/kafka/tools/RecordReader.java b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java similarity index 98% rename from clients/src/main/java/org/apache/kafka/tools/RecordReader.java rename to tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java index 635dec6ee03..cda150064ad 100644 --- a/clients/src/main/java/org/apache/kafka/tools/RecordReader.java +++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.tools; +package org.apache.kafka.tools.api; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Configurable; diff --git a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/package-info.java b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/package-info.java new file mode 100644 index 00000000000..cd1129e09aa --- /dev/null +++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides interfaces for writing plugins of kafka tools + */ +package org.apache.kafka.tools.api; \ No newline at end of file diff --git a/tools/tools-api/src/test/java/org/apache/kafka/tools/api/RecordReaderTest.java b/tools/tools-api/src/test/java/org/apache/kafka/tools/api/RecordReaderTest.java new file mode 100644 index 00000000000..4c2145d7ffe --- /dev/null +++ b/tools/tools-api/src/test/java/org/apache/kafka/tools/api/RecordReaderTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.api; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +public class RecordReaderTest { + + @Test + void testDefaultCloseAndConfigure() { + RecordReader reader = inputStream -> null; + // `configure` and `close` should have default empty body + Assertions.assertDoesNotThrow(() -> reader.configure(Collections.emptyMap())); + Assertions.assertDoesNotThrow(reader::close); + } +}