Browse Source

MINOR: move RecordReader from org.apache.kafka.tools (client module) to org.apache.kafka.tools.api (tools-api module) (#13454)

Reviewers: Jun Rao <junrao@gmail.com>
pull/13527/head
Chia-Ping Tsai 2 years ago committed by GitHub
parent
commit
637bc92ba1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 59
      build.gradle
  2. 2
      core/src/main/scala/kafka/common/MessageReader.scala
  3. 4
      core/src/main/scala/kafka/tools/ConsoleProducer.scala
  4. 2
      core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
  5. 1
      settings.gradle
  6. 2
      tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java
  7. 20
      tools/tools-api/src/main/java/org/apache/kafka/tools/api/package-info.java
  8. 33
      tools/tools-api/src/test/java/org/apache/kafka/tools/api/RecordReaderTest.java

59
build.gradle

@ -858,6 +858,7 @@ project(':core') {
implementation project(':group-coordinator') implementation project(':group-coordinator')
implementation project(':metadata') implementation project(':metadata')
implementation project(':storage:api') implementation project(':storage:api')
implementation project(':tools:tools-api')
implementation project(':raft') implementation project(':raft')
implementation project(':storage') implementation project(':storage')
@ -1106,6 +1107,8 @@ project(':core') {
from(project(':streams:test-utils').configurations.runtimeClasspath) { into("libs/") } from(project(':streams:test-utils').configurations.runtimeClasspath) { into("libs/") }
from(project(':streams:examples').jar) { into("libs/") } from(project(':streams:examples').jar) { into("libs/") }
from(project(':streams:examples').configurations.runtimeClasspath) { into("libs/") } from(project(':streams:examples').configurations.runtimeClasspath) { into("libs/") }
from(project(':tools:tools-api').jar) { into("libs/") }
from(project(':tools:tools-api').configurations.runtimeClasspath) { into("libs/") }
duplicatesStrategy 'exclude' duplicatesStrategy 'exclude'
} }
@ -1766,6 +1769,61 @@ project(':storage') {
} }
} }
project(':tools:tools-api') {
archivesBaseName = "kafka-tools-api"
dependencies {
implementation project(':clients')
testImplementation libs.junitJupiter
}
task createVersionFile() {
def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
inputs.property "commitId", commitId
inputs.property "version", version
outputs.file receiptFile
doLast {
def data = [
commitId: commitId,
version: version,
]
receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}
sourceSets {
main {
java {
srcDirs = ["src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
}
}
}
jar {
dependsOn createVersionFile
from("$buildDir") {
include "kafka/$buildVersionFileName"
}
}
clean.doFirst {
delete "$buildDir/kafka/"
}
javadoc {
include "**/org/apache/kafka/tools/api/*"
}
}
project(':tools') { project(':tools') {
archivesBaseName = "kafka-tools" archivesBaseName = "kafka-tools"
@ -1773,6 +1831,7 @@ project(':tools') {
implementation project(':clients') implementation project(':clients')
implementation project(':server-common') implementation project(':server-common')
implementation project(':log4j-appender') implementation project(':log4j-appender')
implementation project(':tools:tools-api')
implementation libs.argparse4j implementation libs.argparse4j
implementation libs.jacksonDatabind implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes implementation libs.jacksonJDK8Datatypes

2
core/src/main/scala/kafka/common/MessageReader.scala

@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
* This is used by the `ConsoleProducer`. * This is used by the `ConsoleProducer`.
*/ */
@deprecated("This class has been deprecated and will be removed in 4.0." + @deprecated("This class has been deprecated and will be removed in 4.0." +
"Please use org.apache.kafka.tools.RecordReader instead", "3.5.0") "Please use org.apache.kafka.tools.api.RecordReader instead", "3.5.0")
trait MessageReader { trait MessageReader {
def init(inputStream: InputStream, props: Properties): Unit = {} def init(inputStream: InputStream, props: Properties): Unit = {}

4
core/src/main/scala/kafka/tools/ConsoleProducer.scala

@ -31,7 +31,7 @@ import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.tools.RecordReader import org.apache.kafka.tools.api.RecordReader
import scala.annotation.nowarn import scala.annotation.nowarn
@ -45,7 +45,7 @@ object ConsoleProducer extends Logging {
r.configure(prop.asInstanceOf[java.util.Map[String, _]]) r.configure(prop.asInstanceOf[java.util.Map[String, _]])
r r
case r: MessageReader => case r: MessageReader =>
logger.warn("MessageReader is deprecated. Please use org.apache.kafka.tools.RecordReader instead") logger.warn("MessageReader is deprecated. Please use org.apache.kafka.tools.api.RecordReader instead")
new RecordReader { new RecordReader {
private[this] var initialized = false private[this] var initialized = false

2
core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala

@ -23,7 +23,7 @@ import java.nio.file.Files
import kafka.tools.ConsoleProducer.LineMessageReader import kafka.tools.ConsoleProducer.LineMessageReader
import kafka.utils.{Exit, TestUtils} import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
import org.apache.kafka.tools.RecordReader import org.apache.kafka.tools.api.RecordReader
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.mockito.Mockito import org.mockito.Mockito

1
settings.gradle

@ -58,6 +58,7 @@ include 'clients',
'streams:upgrade-system-tests-32', 'streams:upgrade-system-tests-32',
'streams:upgrade-system-tests-33', 'streams:upgrade-system-tests-33',
'tools', 'tools',
'tools:tools-api',
'trogdor' 'trogdor'
rootProject.name = 'kafka' rootProject.name = 'kafka'

2
clients/src/main/java/org/apache/kafka/tools/RecordReader.java → tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.kafka.tools; package org.apache.kafka.tools.api;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable; import org.apache.kafka.common.Configurable;

20
tools/tools-api/src/main/java/org/apache/kafka/tools/api/package-info.java

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides interfaces for writing plugins of kafka tools
*/
package org.apache.kafka.tools.api;

33
tools/tools-api/src/test/java/org/apache/kafka/tools/api/RecordReaderTest.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.api;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
public class RecordReaderTest {
@Test
void testDefaultCloseAndConfigure() {
RecordReader reader = inputStream -> null;
// `configure` and `close` should have default empty body
Assertions.assertDoesNotThrow(() -> reader.configure(Collections.emptyMap()));
Assertions.assertDoesNotThrow(reader::close);
}
}
Loading…
Cancel
Save