Browse Source

KAFKA-8748: Fix flaky testDescribeLogDirsRequest (#7182)

The introduction of KIP-480: Sticky Producer Partitioner had the
side effect that generateAndProduceMessages can often write
messages to a lower number of partitions to improve batching.

testDescribeLogDirsRequest (and potentially other tests) relies
on the messages being written somewhat uniformly to the topic
partitions. We fix the issue by including a monotonically
increasing key in the produced messages.

I also included a couple of minor clean-ups I noticed while
debugging the issue.

The test failed very frequently when executed locally before the
change and it passed 100 times consecutively after the change.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
pull/7174/head
Ismael Juma 5 years ago committed by GitHub
parent
commit
600cc48fa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
  2. 5
      core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
  3. 7
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

3
core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala

@ -57,7 +57,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest { @@ -57,7 +57,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
val log1 = servers.head.logManager.getLog(tp1).get
assertEquals(log0.size, replicaInfo0.size)
assertEquals(log1.size, replicaInfo1.size)
assertTrue(servers.head.logManager.getLog(tp0).get.logEndOffset > 0)
val logEndOffset = servers.head.logManager.getLog(tp0).get.logEndOffset
assertTrue(s"LogEndOffset '$logEndOffset' should be > 0", logEndOffset > 0)
assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp0, log0.logEndOffset, false), replicaInfo0.offsetLag)
assertEquals(servers.head.replicaManager.getLogEndOffsetLag(tp1, log1.logEndOffset, false), replicaInfo1.offsetLag)
}

5
core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala

@ -155,10 +155,7 @@ object JaasTestUtils { @@ -155,10 +155,7 @@ object JaasTestUtils {
val serviceName = "kafka"
def saslConfigs(saslProperties: Option[Properties]): Properties = {
val result = saslProperties match {
case Some(properties) => properties
case None => new Properties
}
val result = saslProperties.getOrElse(new Properties)
// IBM Kerberos module doesn't support the serviceName JAAS property, hence it needs to be
// passed as a Kafka property
if (Java.isIbmJdk && !result.contains(KafkaConfig.SaslKerberosServiceNameProp))

7
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -53,7 +53,7 @@ import org.apache.kafka.common.internals.Topic @@ -53,7 +53,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.record._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
@ -1065,7 +1065,10 @@ object TestUtils extends Logging { @@ -1065,7 +1065,10 @@ object TestUtils extends Logging {
numMessages: Int,
acks: Int = -1): Seq[String] = {
val values = (0 until numMessages).map(x => s"test-$x")
val records = values.map(v => new ProducerRecord[Array[Byte], Array[Byte]](topic, v.getBytes))
val intSerializer = new IntegerSerializer()
val records = values.zipWithIndex.map { case (v, i) =>
new ProducerRecord(topic, intSerializer.serialize(topic, i), v.getBytes)
}
produceMessages(servers, records, acks)
values
}

Loading…
Cancel
Save