Browse Source

MINOR: Disable JmxTool in kafkatest console-consumer by default (#7785)

Do not initialize `JmxTool` by default when running console consumer. In order to support this, we remove `has_partitions_assigned` and its only usage in an assertion inside `ProduceConsumeValidateTest`, which did not seem to contribute much to the validation.

Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
pull/7930/head
Brian Bushree 5 years ago committed by Jason Gustafson
parent
commit
422bc1f0fa
  1. 26
      tests/kafkatest/services/console_consumer.py
  2. 12
      tests/kafkatest/tests/produce_consume_validate.py

26
tests/kafkatest/services/console_consumer.py

@ -249,7 +249,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) @@ -249,7 +249,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
with self.lock:
self._init_jmx_attributes()
self.logger.debug("collecting following jmx objects: %s", self.jmx_object_names)
self.start_jmx_tool(idx, node)
@ -292,28 +291,3 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) @@ -292,28 +291,3 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
def java_class_name(self):
return "ConsoleConsumer"
def has_partitions_assigned(self, node):
if self.new_consumer is False:
return False
idx = self.idx(node)
with self.lock:
self._init_jmx_attributes()
self.start_jmx_tool(idx, node)
self.read_jmx_output(idx, node)
if not self.assigned_partitions_jmx_attr in self.maximum_jmx_value:
return False
self.logger.debug("Number of partitions assigned %f" % self.maximum_jmx_value[self.assigned_partitions_jmx_attr])
return self.maximum_jmx_value[self.assigned_partitions_jmx_attr] > 0.0
def _init_jmx_attributes(self):
# Must hold lock
if self.new_consumer:
# We use a flag to track whether we're using this automatically generated ID because the service could be
# restarted multiple times and the client ID may be changed.
if getattr(self, '_automatic_metrics', False) or not self.jmx_object_names:
self._automatic_metrics = True
self.jmx_object_names = ["kafka.consumer:type=consumer-coordinator-metrics,client-id=%s" % self.client_id]
self.jmx_attributes = ["assigned-partitions"]
self.assigned_partitions_jmx_attr = "kafka.consumer:type=consumer-coordinator-metrics,client-id=%s:assigned-partitions" % self.client_id

12
tests/kafkatest/tests/produce_consume_validate.py

@ -18,8 +18,6 @@ from ducktape.utils.util import wait_until @@ -18,8 +18,6 @@ from ducktape.utils.util import wait_until
from kafkatest.utils import validate_delivery
import time
class ProduceConsumeValidateTest(Test):
"""This class provides a shared template for tests which follow the common pattern of:
@ -56,20 +54,10 @@ class ProduceConsumeValidateTest(Test): @@ -56,20 +54,10 @@ class ProduceConsumeValidateTest(Test):
if (self.consumer_init_timeout_sec > 0):
self.logger.debug("Waiting %ds for the consumer to initialize.",
self.consumer_init_timeout_sec)
start = int(time.time())
wait_until(lambda: self.consumer.alive(self.consumer.nodes[0]) is True,
timeout_sec=self.consumer_init_timeout_sec,
err_msg="Consumer process took more than %d s to fork" %\
self.consumer_init_timeout_sec)
end = int(time.time())
remaining_time = self.consumer_init_timeout_sec - (end - start)
if remaining_time < 0 :
remaining_time = 0
if self.consumer.new_consumer:
wait_until(lambda: self.consumer.has_partitions_assigned(self.consumer.nodes[0]) is True,
timeout_sec=remaining_time,
err_msg="Consumer process took more than %d s to have partitions assigned" %\
remaining_time)
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5,

Loading…
Cancel
Save