diff --git a/tests/kafkatest/services/verifiable_client.py b/tests/kafkatest/services/verifiable_client.py index d8ffa356a82..d249de6603f 100644 --- a/tests/kafkatest/services/verifiable_client.py +++ b/tests/kafkatest/services/verifiable_client.py @@ -228,7 +228,7 @@ class VerifiableClientJava (VerifiableClientMixin): """ super(VerifiableClientJava, self).__init__() self.parent = parent - self.java_class_name = self.parent.__class__.__name__ + self.java_class_name = parent.java_class_name() self.conf = conf def exec_cmd (self, node): diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 735a7bf0b60..0d60dd102c4 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -129,8 +129,15 @@ class ConsumerEventHandler(object): return None - class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): + """This service wraps org.apache.kafka.tools.VerifiableConsumer for use in + system testing. + + NOTE: this class should be treated as a PUBLIC API. Downstream users use + this service both directly and through class extension, so care must be + taken to ensure compatibility. + """ + PERSISTENT_ROOT = "/mnt/verifiable_consumer" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout") STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stderr") @@ -175,6 +182,9 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou for node in self.nodes: node.version = version + def java_class_name(self): + return "VerifiableConsumer" + def _worker(self, idx, node): with self.lock: if node not in self.event_handlers: diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index e32a3def921..859e3c483d6 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -28,9 +28,15 @@ from kafkatest.utils import is_int, is_int_with_prefix from kafkatest.version import DEV_BRANCH from kafkatest.utils.remote_account import line_count +class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): + """This service wraps org.apache.kafka.tools.VerifiableProducer for use in + system testing. + NOTE: this class should be treated as a PUBLIC API. Downstream users use + this service both directly and through class extension, so care must be + taken to ensure compatibility. + """ -class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): PERSISTENT_ROOT = "/mnt/verifiable_producer" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout") STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr") @@ -87,6 +93,9 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.stop_timeout_sec = stop_timeout_sec self.request_timeout_sec = request_timeout_sec + def java_class_name(self): + return "VerifiableProducer" + def prop_file(self, node): idx = self.idx(node) prop_file = str(self.security_config)