Browse Source

MINOR: Fix transiently failing consumer group admin integration test (#5067)

Since the producer is using retries=0, we need to await topic creation before sending any records.

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/5069/head
Jason Gustafson 7 years ago committed by GitHub
parent
commit
e8847205f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

7
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -963,8 +963,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -963,8 +963,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
}
/**
* Test the consumer group APIs.
*/
* Test the consumer group APIs.
*/
@Test
def testConsumerGroups(): Unit = {
val config = createConfig()
@ -979,6 +979,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -979,6 +979,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val testNumPartitions = 2
client.createTopics(Collections.singleton(
new NewTopic(testTopicName, testNumPartitions, 1))).all().get()
waitForTopics(client, List(testTopicName), List())
val producer = createNewProducer
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
@ -1044,7 +1046,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1044,7 +1046,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(2, result.all().get().size())
// Test listConsumerGroupOffsets
val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
TestUtils.waitUntilTrue(() => {
val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
val part = new TopicPartition(testTopicName, 0)

Loading…
Cancel
Save