Browse Source

MINOR: fixes on streams upgrade test (#5754)

1. In test_upgrade_downgrade_brokers, allow duplicates to happen.
2. In test_version_probing_upgrade, grep the generation numbers from brokers at the end, and check if they can ever be synchronized.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
pull/5797/head
Guozhang Wang 6 years ago committed by GitHub
parent
commit
2d77746a7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 52
      tests/kafkatest/tests/streams/streams_upgrade_test.py

52
tests/kafkatest/tests/streams/streams_upgrade_test.py

@ -122,7 +122,7 @@ class StreamsUpgradeTest(Test):
self.processor1.stop() self.processor1.stop()
node = self.driver.node node = self.driver.node
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) node.account.ssh("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions) @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
@ -470,8 +470,6 @@ class StreamsUpgradeTest(Test):
self.old_processors.remove(processor) self.old_processors.remove(processor)
self.upgraded_processors.append(processor) self.upgraded_processors.append(processor)
current_generation = current_generation + 1
log_monitor.wait_until("Kafka version : " + str(DEV_VERSION), log_monitor.wait_until("Kafka version : " + str(DEV_VERSION),
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + str(DEV_VERSION) + " in " + str(node.account)) err_msg="Could not detect Kafka Streams version " + str(DEV_VERSION) + " in " + str(node.account))
@ -480,16 +478,6 @@ class StreamsUpgradeTest(Test):
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect FutureStreamsPartitionAssignor in " + str(node.account)) err_msg="Could not detect FutureStreamsPartitionAssignor in " + str(node.account))
log_monitor.wait_until("Successfully joined group with generation " + str(current_generation),
timeout_sec=60,
err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(node.account))
first_other_monitor.wait_until("Successfully joined group with generation " + str(current_generation),
timeout_sec=60,
err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(first_other_node.account))
second_other_monitor.wait_until("Successfully joined group with generation " + str(current_generation),
timeout_sec=60,
err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(second_other_node.account))
if processor == self.leader: if processor == self.leader:
self.update_leader() self.update_leader()
else: else:
@ -533,12 +521,34 @@ class StreamsUpgradeTest(Test):
err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account)) err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
# version probing should trigger second rebalance # version probing should trigger second rebalance
current_generation = current_generation + 1 # now we check that after consecutive rebalances we have synchronized generation
generation_synchronized = False
retries = 0
for p in self.processors: while retries < 10:
monitors[p].wait_until("Successfully joined group with generation " + str(current_generation), processor_found = self.extract_generation_from_logs(processor)
timeout_sec=60, first_other_processor_found = self.extract_generation_from_logs(first_other_processor)
err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(p.node.account)) second_other_processor_found = self.extract_generation_from_logs(second_other_processor)
if len(processor_found) > 0 and len(first_other_processor_found) > 0 and len(second_other_processor_found) > 0:
self.logger.info("processor: " + str(processor_found))
self.logger.info("first other processor: " + str(first_other_processor_found))
self.logger.info("second other processor: " + str(second_other_processor_found))
processor_generation = self.extract_highest_generation(processor_found)
first_other_processor_generation = self.extract_highest_generation(first_other_processor_found)
second_other_processor_generation = self.extract_highest_generation(second_other_processor_found)
if processor_generation == first_other_processor_generation and processor_generation == second_other_processor_generation:
current_generation = processor_generation
generation_synchronized = True
break
time.sleep(5)
retries = retries + 1
if generation_synchronized == False:
raise Exception("Never saw all three processors have the synchronized generation number")
if processor == self.leader: if processor == self.leader:
self.update_leader() self.update_leader()
@ -550,6 +560,12 @@ class StreamsUpgradeTest(Test):
return current_generation return current_generation
def extract_generation_from_logs(self, processor):
return list(processor.node.account.ssh_capture("grep \"Successfully joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1; if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i }; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True))
def extract_highest_generation(self, found_generations):
return int(found_generations[-1])
def verify_metadata_no_upgraded_yet(self): def verify_metadata_no_upgraded_yet(self):
for p in self.processors: for p in self.processors:
found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True)) found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))

Loading…
Cancel
Save