Browse Source

KAFKA-15155: Follow PEP 8 best practice in Python to check if a container is empty (#13974)

Reviewers: Divij Vaidya <diviv@amazon.com>
pull/13994/head
Yi-Sheng Lien 1 year ago committed by GitHub
parent
commit
b8f3776f24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      kafka-merge-pr.py
  2. 2
      release.py
  3. 6
      reviewers.py
  4. 2
      tests/kafkatest/benchmarks/core/benchmark_test.py
  5. 4
      tests/kafkatest/services/connect.py
  6. 4
      tests/kafkatest/services/kafka/kafka.py
  7. 4
      tests/kafkatest/services/monitor/http.py
  8. 2
      tests/kafkatest/services/monitor/jmx.py
  9. 2
      tests/kafkatest/services/streams.py
  10. 2
      tests/kafkatest/services/transactional_message_copier.py
  11. 8
      tests/kafkatest/services/trogdor/kibosh.py
  12. 4
      tests/kafkatest/services/trogdor/trogdor.py
  13. 2
      tests/kafkatest/tests/connect/connect_distributed_test.py
  14. 2
      tests/kafkatest/tests/connect/connect_rest_test.py
  15. 2
      tests/kafkatest/tests/core/get_offset_shell_test.py
  16. 4
      tests/kafkatest/tests/core/group_mode_transactions_test.py
  17. 4
      tests/kafkatest/tests/core/replication_test.py
  18. 2
      tests/kafkatest/tests/core/snapshot_test.py
  19. 4
      tests/kafkatest/tests/core/transactions_test.py
  20. 2
      tests/kafkatest/tests/streams/streams_broker_bounce_test.py
  21. 2
      tests/kafkatest/tests/streams/streams_relational_smoke_test.py

2
kafka-merge-pr.py

@ -319,7 +319,7 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""): @@ -319,7 +319,7 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""):
def resolve_jira_issues(title, merge_branches, comment):
jira_ids = re.findall("%s-[0-9]{4,5}" % CAPITALIZED_PROJECT_NAME, title)
if len(jira_ids) == 0:
if not jira_ids:
resolve_jira_issue(merge_branches, comment)
for jira_id in jira_ids:
resolve_jira_issue(merge_branches, comment, jira_id)

2
release.py

@ -91,7 +91,7 @@ def fail(msg): @@ -91,7 +91,7 @@ def fail(msg):
sys.exit(1)
def print_output(output):
if output is None or len(output) == 0:
if output is None or not output:
return
for line in output.split('\n'):
print(">", line)

6
reviewers.py

@ -57,7 +57,7 @@ if __name__ == "__main__": @@ -57,7 +57,7 @@ if __name__ == "__main__":
selected_reviewers = []
while True:
if len(selected_reviewers) != 0:
if selected_reviewers:
print(f"Reviewers so far: {selected_reviewers}")
user_input = prompt_for_user()
if user_input is None:
@ -68,7 +68,7 @@ if __name__ == "__main__": @@ -68,7 +68,7 @@ if __name__ == "__main__":
candidates.append((reviewer, email, count))
if len(candidates) == 10:
break
if len(candidates) == 0:
if not candidates:
continue
print("\nPossible matches (in order of most recent):")
@ -85,7 +85,7 @@ if __name__ == "__main__": @@ -85,7 +85,7 @@ if __name__ == "__main__":
print("Invalid selection")
continue
if len(selected_reviewers) != 0:
if selected_reviewers:
out = "\n\nReviewers: "
out += ", ".join([f"{name} <{email}>" for name, email, _ in selected_reviewers])
out += "\n"

2
tests/kafkatest/benchmarks/core/benchmark_test.py

@ -145,7 +145,7 @@ class Benchmark(Test): @@ -145,7 +145,7 @@ class Benchmark(Test):
for i in range(nblocks):
subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
if len(subset) == 0:
if not subset:
summary.append(" Time block %d: (empty)" % i)
data[i] = None
else:

4
tests/kafkatest/services/connect.py

@ -362,7 +362,7 @@ class ConnectStandaloneService(ConnectServiceBase): @@ -362,7 +362,7 @@ class ConnectStandaloneService(ConnectServiceBase):
# The default mode is to wait until the complete startup of the worker
self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs)
if len(self.pids(node)) == 0:
if not self.pids(node):
raise RuntimeError("No process ids recorded")
@ -416,7 +416,7 @@ class ConnectDistributedService(ConnectServiceBase): @@ -416,7 +416,7 @@ class ConnectDistributedService(ConnectServiceBase):
# The default mode is to wait until the complete startup of the worker
self.start_and_wait_to_join_group(node, 'distributed', '')
if len(self.pids(node)) == 0:
if not self.pids(node):
raise RuntimeError("No process ids recorded")

4
tests/kafkatest/services/kafka/kafka.py

@ -888,7 +888,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -888,7 +888,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.maybe_setup_client_scram_credentials(node)
self.start_jmx_tool(self.idx(node), node)
if len(self.pids(node)) == 0:
if not self.pids(node):
raise Exception("No process ids recorded on node %s" % node.account.hostname)
def upgrade_metadata_version(self, new_version):
@ -956,7 +956,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -956,7 +956,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.combined_nodes_started -= 1
try:
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=timeout_sec,
wait_until(lambda: not self.pids(node), timeout_sec=timeout_sec,
err_msg="Kafka node failed to stop in %d seconds" % timeout_sec)
except Exception:
if node_has_combined_controllers:

4
tests/kafkatest/services/monitor/http.py

@ -216,12 +216,12 @@ class _ReverseForwarder(object): @@ -216,12 +216,12 @@ class _ReverseForwarder(object):
r, w, x = select([sock, chan], [], [])
if sock in r:
data = sock.recv(1024)
if len(data) == 0:
if not data:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
if not data:
break
sock.send(data)
chan.close()

2
tests/kafkatest/services/monitor/jmx.py

@ -115,7 +115,7 @@ class JmxMixin(object): @@ -115,7 +115,7 @@ class JmxMixin(object):
# do not calculate average and maximum of jmx stats until we have read output from all nodes
# If the service is multithreaded, this means that the results will be aggregated only when the last
# service finishes
if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats):
if any(not time_to_stats for time_to_stats in self.jmx_stats):
return
start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats])

2
tests/kafkatest/services/streams.py

@ -313,7 +313,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service): @@ -313,7 +313,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
node.account.ssh(self.start_cmd(node))
monitor.wait_until(self.expectedMessage, timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
if not self.pids(node):
raise RuntimeError("No process ids recorded")

2
tests/kafkatest/services/transactional_message_copier.py

@ -168,7 +168,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService @@ -168,7 +168,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig)
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Node %s: Message Copier failed to stop" % str(node.account))
wait_until(lambda: not self.pids(node), timeout_sec=60, err_msg="Node %s: Message Copier failed to stop" % str(node.account))
def stop_node(self, node, clean_shutdown=True):
self.kill_node(node, clean_shutdown)

8
tests/kafkatest/services/trogdor/kibosh.py

@ -45,7 +45,7 @@ class KiboshService(Service): @@ -45,7 +45,7 @@ class KiboshService(Service):
:param persist: Where the log files and pid files will be created.
"""
Service.__init__(self, context, num_nodes=0)
if (len(nodes) == 0):
if not nodes:
raise RuntimeError("You must supply at least one node to run the service on.")
for node in nodes:
self.nodes.append(node)
@ -97,11 +97,11 @@ class KiboshService(Service): @@ -97,11 +97,11 @@ class KiboshService(Service):
(self.pidfile_path, self.pidfile_path), allow_fail=True)]
def wait_node(self, node, timeout_sec=None):
return len(self.pids(node)) == 0
return not self.pids(node)
def kibosh_process_running(self, node):
pids = self.pids(node)
if len(pids) == 0:
if not pids:
return True
return False
@ -133,7 +133,7 @@ class KiboshService(Service): @@ -133,7 +133,7 @@ class KiboshService(Service):
:param node: The node.
:param spec: An array of FaultSpec objects describing the faults.
"""
if len(specs) == 0:
if not specs:
obj_json = "{}"
else:
fault_array = [spec.kibosh_message for spec in specs]

4
tests/kafkatest/services/trogdor/trogdor.py

@ -176,9 +176,9 @@ class TrogdorService(KafkaPathResolverMixin, Service): @@ -176,9 +176,9 @@ class TrogdorService(KafkaPathResolverMixin, Service):
def wait_node(self, node, timeout_sec=None):
if self.is_coordinator(node):
return len(node.account.java_pids(self.coordinator_class_name())) == 0
return not node.account.java_pids(self.coordinator_class_name())
else:
return len(node.account.java_pids(self.agent_class_name())) == 0
return not node.account.java_pids(self.agent_class_name())
def stop_node(self, node):
"""Halt trogdor processes on this node."""

2
tests/kafkatest/tests/connect/connect_distributed_test.py

@ -610,7 +610,7 @@ class ConnectDistributedTest(Test): @@ -610,7 +610,7 @@ class ConnectDistributedTest(Test):
for task in range(num_tasks):
# Validate source messages
src_seqnos = [msg['payload']['seqno'] for msg in src_messages if msg['payload']['task'] == task]
if len(src_seqnos) == 0:
if not src_seqnos:
self.logger.error("No records produced by task " + str(task))
errors.append("No records produced by task %d" % (task))
success = False

2
tests/kafkatest/tests/connect/connect_rest_test.py

@ -203,7 +203,7 @@ class ConnectRestApiTest(KafkaTest): @@ -203,7 +203,7 @@ class ConnectRestApiTest(KafkaTest):
self.cc.delete_connector("local-file-source")
self.cc.delete_connector("local-file-sink")
wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
wait_until(lambda: not self.cc.list_connectors(), timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
def validate_output(self, input):
input_set = set(input)

2
tests/kafkatest/tests/core/get_offset_shell_test.py

@ -99,7 +99,7 @@ class GetOffsetShellTest(Test): @@ -99,7 +99,7 @@ class GetOffsetShellTest(Test):
offsets = self.kafka.get_offset_shell(**kwargs).split("\n")
sum = 0
for offset in offsets:
if len(offset) == 0:
if not offset:
continue
sum += int(offset.split(":")[-1])
return sum

4
tests/kafkatest/tests/core/group_mode_transactions_test.py

@ -99,12 +99,12 @@ class GroupModeTransactionsTest(Test): @@ -99,12 +99,12 @@ class GroupModeTransactionsTest(Test):
self.kafka.stop_node(node, clean_shutdown = False)
gracePeriodSecs = 5
if self.zk:
wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node),
wait_until(lambda: not self.kafka.pids(node) and not self.kafka.is_registered(node),
timeout_sec=self.kafka.zk_session_timeout + gracePeriodSecs,
err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(node.account))
else:
brokerSessionTimeoutSecs = 18
wait_until(lambda: len(self.kafka.pids(node)) == 0,
wait_until(lambda: not self.kafka.pids(node),
timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs,
err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account))
time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)

4
tests/kafkatest/tests/core/replication_test.py

@ -69,12 +69,12 @@ def hard_bounce(test, broker_type): @@ -69,12 +69,12 @@ def hard_bounce(test, broker_type):
gracePeriodSecs = 5
if test.zk:
wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
wait_until(lambda: not test.kafka.pids(prev_broker_node) and not test.kafka.is_registered(prev_broker_node),
timeout_sec=test.kafka.zk_session_timeout + gracePeriodSecs,
err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
else:
brokerSessionTimeoutSecs = 18
wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0,
wait_until(lambda: not test.kafka.pids(prev_broker_node),
timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs,
err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(prev_broker_node.account))
time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)

2
tests/kafkatest/tests/core/snapshot_test.py

@ -118,7 +118,7 @@ class TestSnapshots(ProduceConsumeValidateTest): @@ -118,7 +118,7 @@ class TestSnapshots(ProduceConsumeValidateTest):
cmd = "ls %s" % file_path
files = node.account.ssh_output(cmd, allow_fail=True, combine_stderr=False)
if len(files) == 0:
if not files:
self.logger.debug("File %s does not exist" % file_path)
return False
else:

4
tests/kafkatest/tests/core/transactions_test.py

@ -97,12 +97,12 @@ class TransactionsTest(Test): @@ -97,12 +97,12 @@ class TransactionsTest(Test):
self.kafka.stop_node(node, clean_shutdown = False)
gracePeriodSecs = 5
if self.zk:
wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node),
wait_until(lambda: not self.kafka.pids(node) and not self.kafka.is_registered(node),
timeout_sec=self.kafka.zk_session_timeout + gracePeriodSecs,
err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(node.account))
else:
brokerSessionTimeoutSecs = 18
wait_until(lambda: len(self.kafka.pids(node)) == 0,
wait_until(lambda: not self.kafka.pids(node),
timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs,
err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account))
time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)

2
tests/kafkatest/tests/streams/streams_broker_bounce_test.py

@ -67,7 +67,7 @@ def hard_bounce(test, topic, broker_type): @@ -67,7 +67,7 @@ def hard_bounce(test, topic, broker_type):
# Since this is a hard kill, we need to make sure the process is down and that
# zookeeper has registered the loss by expiring the broker's session timeout.
wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and
wait_until(lambda: not test.kafka.pids(prev_broker_node) and
not (quorum.for_test(test.test_context) == quorum.zk and test.kafka.is_registered(prev_broker_node)),
timeout_sec=test.kafka.zk_session_timeout + 5,
err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))

2
tests/kafkatest/tests/streams/streams_relational_smoke_test.py

@ -60,7 +60,7 @@ class StreamsRelationalSmokeTestService(StreamsTestBaseService): @@ -60,7 +60,7 @@ class StreamsRelationalSmokeTestService(StreamsTestBaseService):
self.logger.info("Starting process on " + str(node.account))
node.account.ssh(self.start_cmd(node))
if len(self.pids(node)) == 0:
if not self.pids(node):
raise RuntimeError("No process ids recorded")
def await_command(self, command):

Loading…
Cancel
Save