Browse Source

KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime (#13313)

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/13504/merge
Greg Harris 1 year ago committed by GitHub
parent
commit
125dbb9286
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      build.gradle
  2. 2
      checkstyle/import-control.xml
  3. 2
      clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
  4. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
  5. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
  6. 2
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
  7. 7
      tests/kafkatest/directory_layout/kafka_path.py
  8. 2
      tests/kafkatest/sanity_checks/test_verifiable_producer.py
  9. 9
      tests/kafkatest/services/verifiable_client.py
  10. 1
      tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
  11. 1
      tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java

2
build.gradle

@ -2742,8 +2742,6 @@ project(':connect:runtime') { @@ -2742,8 +2742,6 @@ project(':connect:runtime') {
api project(':connect:json')
api project(':connect:transforms')
implementation project(':tools')
implementation libs.slf4jApi
implementation libs.log4j
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation

2
checkstyle/import-control.xml

@ -561,7 +561,6 @@ @@ -561,7 +561,6 @@
<subpackage name="integration">
<allow pkg="org.apache.kafka.connect.util.clusters" />
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="javax.ws.rs" />
<allow pkg="org.apache.http"/>
<allow pkg="org.eclipse.jetty.util"/>
@ -584,7 +583,6 @@ @@ -584,7 +583,6 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="com.fasterxml.jackson" />
</subpackage>

2
tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java → clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java

@ -14,7 +14,7 @@ @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
package org.apache.kafka.common.utils;
/**

2
connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java

@ -22,7 +22,7 @@ import org.apache.kafka.connect.data.Struct; @@ -22,7 +22,7 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.tools.ThroughputThrottler;
import org.apache.kafka.common.utils.ThroughputThrottler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

2
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java

@ -20,7 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; @@ -20,7 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.tools.ThroughputThrottler;
import org.apache.kafka.common.utils.ThroughputThrottler;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;

2
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java

@ -27,7 +27,7 @@ import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; @@ -27,7 +27,7 @@ import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.tools.ThroughputThrottler;
import org.apache.kafka.common.utils.ThroughputThrottler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

7
tests/kafkatest/directory_layout/kafka_path.py

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
import importlib
import os
from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH
from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH, LATEST_0_9
"""This module serves a few purposes:
@ -49,6 +49,11 @@ JARS = { @@ -49,6 +49,11 @@ JARS = {
CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar",
TOOLS_JAR_NAME: "tools/build/libs/kafka-tools*.jar",
TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "tools/build/dependant-libs*/*.jar"
},
# TODO: This is only used in 0.8.2.x system tests, remove with KAFKA-14762
LATEST_0_9.__str__(): {
TOOLS_JAR_NAME: "libs/kafka-tools*.jar",
TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/{argparse4j,jackson}*.jar"
}
}

2
tests/kafkatest/sanity_checks/test_verifiable_producer.py

@ -92,7 +92,7 @@ class TestVerifiableProducer(Test): @@ -92,7 +92,7 @@ class TestVerifiableProducer(Test):
# Easy fix is to decrease throughput= above, the good fix is to make the producer
# not terminate until explicitly killed in this case.
if node.version <= LATEST_0_8_2:
assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring], logger=self.logger)
assert is_version(node, [node.version.vstring, LATEST_0_9.vstring], logger=self.logger)
else:
assert is_version(node, [node.version.vstring], logger=self.logger)

9
tests/kafkatest/services/verifiable_client.py

@ -14,7 +14,7 @@ @@ -14,7 +14,7 @@
# limitations under the License.
from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9
from ducktape.cluster.remoteaccount import RemoteCommandError
import importlib
@ -252,9 +252,10 @@ class VerifiableClientJava (VerifiableClient): @@ -252,9 +252,10 @@ class VerifiableClientJava (VerifiableClient):
cmd = ""
if self.java_class_name == 'VerifiableProducer' and node.version <= LATEST_0_8_2:
# 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
# the tools jar from trunk to the classpath
tools_jar = self.parent.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
tools_dependant_libs_jar = self.parent.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
# the tools jar from 0.9.x to the classpath
# TODO remove with KAFKA-14762
tools_jar = self.parent.path.jar(TOOLS_JAR_NAME, LATEST_0_9)
tools_dependant_libs_jar = self.parent.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, LATEST_0_9)
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
cmd += "export CLASSPATH; "

1
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java

@ -42,6 +42,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParser; @@ -42,6 +42,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.ThroughputThrottler;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.ToolsUtils;

1
tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java

@ -34,6 +34,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; @@ -34,6 +34,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.ThroughputThrottler;
import java.io.IOException;
import java.io.InputStream;

Loading…
Cancel
Save