From 413db69d41f22b10295782e38890cb0077d6e21c Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 18 Feb 2020 01:58:02 +0800 Subject: [PATCH] KAFKA-8245: Fix Flaky Test DeleteConsumerGroupsTest#testDeleteCmdAllGroups (#8032) Change unit tests to make sure the consumer group is in Stable state (i.e. consumers have completed joining the group) --- .../unit/kafka/admin/DeleteConsumerGroupsTest.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala index 35828f0a21d..23de7904d5e 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala @@ -107,7 +107,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().contains(group) + service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" }, "The group did not initialize as expected.") executor.shutdown() @@ -137,7 +137,8 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().forall(groupId => groups.keySet.contains(groupId)) + service.listGroups().toSet == groups.keySet && + groups.keySet.forall(groupId => service.collectGroupState(groupId).state == "Stable") }, "The group did not initialize as expected.") // Shutdown consumers to empty out groups @@ -168,7 +169,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().contains(group) + service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" }, "The group did not initialize as expected.") executor.shutdown() @@ -193,7 +194,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().contains(group) + service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" }, "The group did not initialize as expected.") executor.shutdown() @@ -220,7 +221,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().contains(group) + service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" }, "The group did not initialize as expected.") executor.shutdown()