diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala index 35828f0a21d..23de7904d5e 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala @@ -107,7 +107,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().contains(group) + service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" }, "The group did not initialize as expected.") executor.shutdown() @@ -137,7 +137,8 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().forall(groupId => groups.keySet.contains(groupId)) + service.listGroups().toSet == groups.keySet && + groups.keySet.forall(groupId => service.collectGroupState(groupId).state == "Stable") }, "The group did not initialize as expected.") // Shutdown consumers to empty out groups @@ -168,7 +169,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().contains(group) + service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" }, "The group did not initialize as expected.") executor.shutdown() @@ -193,7 +194,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().contains(group) + service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" }, "The group did not initialize as expected.") executor.shutdown() @@ -220,7 +221,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { - service.listGroups().contains(group) + service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" }, "The group did not initialize as expected.") executor.shutdown()