|
|
|
@ -107,7 +107,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
@@ -107,7 +107,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
|
|
|
|
val service = getConsumerGroupService(cgcArgs) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => { |
|
|
|
|
service.listGroups().contains(group) |
|
|
|
|
service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" |
|
|
|
|
}, "The group did not initialize as expected.") |
|
|
|
|
|
|
|
|
|
executor.shutdown() |
|
|
|
@ -137,7 +137,8 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
@@ -137,7 +137,8 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
|
|
|
|
val service = getConsumerGroupService(cgcArgs) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => { |
|
|
|
|
service.listGroups().forall(groupId => groups.keySet.contains(groupId)) |
|
|
|
|
service.listGroups().toSet == groups.keySet && |
|
|
|
|
groups.keySet.forall(groupId => service.collectGroupState(groupId).state == "Stable") |
|
|
|
|
}, "The group did not initialize as expected.") |
|
|
|
|
|
|
|
|
|
// Shutdown consumers to empty out groups |
|
|
|
@ -168,7 +169,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
@@ -168,7 +169,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
|
|
|
|
val service = getConsumerGroupService(cgcArgs) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => { |
|
|
|
|
service.listGroups().contains(group) |
|
|
|
|
service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" |
|
|
|
|
}, "The group did not initialize as expected.") |
|
|
|
|
|
|
|
|
|
executor.shutdown() |
|
|
|
@ -193,7 +194,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
@@ -193,7 +194,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
|
|
|
|
val service = getConsumerGroupService(cgcArgs) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => { |
|
|
|
|
service.listGroups().contains(group) |
|
|
|
|
service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" |
|
|
|
|
}, "The group did not initialize as expected.") |
|
|
|
|
|
|
|
|
|
executor.shutdown() |
|
|
|
@ -220,7 +221,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
@@ -220,7 +221,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
|
|
|
|
|
val service = getConsumerGroupService(cgcArgs) |
|
|
|
|
|
|
|
|
|
TestUtils.waitUntilTrue(() => { |
|
|
|
|
service.listGroups().contains(group) |
|
|
|
|
service.listGroups().contains(group) && service.collectGroupState(group).state == "Stable" |
|
|
|
|
}, "The group did not initialize as expected.") |
|
|
|
|
|
|
|
|
|
executor.shutdown() |
|
|
|
|