Browse Source

KAFKA-14580: Moving EndToEndLatency from core to tools module (#13095)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, Ismael Juma <mlists@juma.me.uk>
pull/13328/head
vamossagar12 2 years ago committed by GitHub
parent
commit
bb3111f472
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      checkstyle/import-control.xml
  2. 179
      core/src/main/scala/kafka/tools/EndToEndLatency.scala
  3. 2
      tests/kafkatest/benchmarks/core/benchmark_test.py
  4. 9
      tests/kafkatest/services/performance/end_to_end_latency.py
  5. 224
      tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
  6. 100
      tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java

1
checkstyle/import-control.xml

@ -408,6 +408,7 @@ @@ -408,6 +408,7 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />

179
core/src/main/scala/kafka/tools/EndToEndLatency.scala

@ -1,179 +0,0 @@ @@ -1,179 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.{Arrays, Collections, Properties}
import kafka.utils.Exit
import org.apache.kafka.clients.admin.{Admin, NewTopic}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import scala.jdk.CollectionConverters._
import scala.util.Random
/**
* This class records the average end to end latency for a single message to travel through Kafka
*
* broker_list = location of the bootstrap broker for both the producer and the consumer
* num_messages = # messages to send
* producer_acks = See ProducerConfig.ACKS_DOC
* message_size_bytes = size of each message in bytes
*
* e.g. [localhost:9092 test 10000 1 20]
*/
object EndToEndLatency {
private val timeout: Long = 60000
private val defaultReplicationFactor: Short = 1
private val defaultNumPartitions: Int = 1
def main(args: Array[String]): Unit = {
if (args.length != 5 && args.length != 6) {
System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file")
Exit.exit(1)
}
val brokerList = args(0)
val topic = args(1)
val numMessages = args(2).toInt
val producerAcks = args(3)
val messageLen = args(4).toInt
val propsFile = if (args.length > 5) Some(args(5)).filter(_.nonEmpty) else None
if (!List("1", "all").contains(producerAcks))
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all")
def loadPropsWithBootstrapServers: Properties = {
val props = propsFile.map(Utils.loadProps).getOrElse(new Properties())
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props
}
val consumerProps = loadPropsWithBootstrapServers
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis())
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
val producerProps = loadPropsWithBootstrapServers
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
def finalise(): Unit = {
consumer.commitSync()
producer.close()
consumer.close()
}
// create topic if it does not exist
if (!consumer.listTopics().containsKey(topic)) {
try {
createTopic(topic, loadPropsWithBootstrapServers)
} catch {
case t: Throwable =>
finalise()
throw new RuntimeException(s"Failed to create topic $topic", t)
}
}
val topicPartitions = consumer.partitionsFor(topic).asScala
.map(p => new TopicPartition(p.topic(), p.partition())).asJava
consumer.assign(topicPartitions)
consumer.seekToEnd(topicPartitions)
consumer.assignment.forEach(consumer.position(_))
var totalTime = 0.0
val latencies = new Array[Long](numMessages)
val random = new Random(0)
for (i <- 0 until numMessages) {
val message = randomBytesOfLen(random, messageLen)
val begin = System.nanoTime
//Send message (of random bytes) synchronously then immediately poll for it
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get()
val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator
val elapsed = System.nanoTime - begin
//Check we got results
if (!recordIter.hasNext) {
finalise()
throw new RuntimeException(s"poll() timed out before finding a result (timeout:[$timeout])")
}
//Check result matches the original record
val sent = new String(message, StandardCharsets.UTF_8)
val read = new String(recordIter.next().value(), StandardCharsets.UTF_8)
if (!read.equals(sent)) {
finalise()
throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]")
}
//Check we only got the one message
if (recordIter.hasNext) {
val count = 1 + recordIter.asScala.size
throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
}
//Report progress
if (i % 1000 == 0)
println(i.toString + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
latencies(i) = elapsed / 1000 / 1000
}
//Results
println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
Arrays.sort(latencies)
val p50 = latencies((latencies.length * 0.5).toInt)
val p99 = latencies((latencies.length * 0.99).toInt)
val p999 = latencies((latencies.length * 0.999).toInt)
println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
finalise()
}
def randomBytesOfLen(random: Random, len: Int): Array[Byte] = {
Array.fill(len)((random.nextInt(26) + 65).toByte)
}
def createTopic(topic: String, props: Properties): Unit = {
println("Topic \"%s\" does not exist. Will create topic with %d partition(s) and replication factor = %d"
.format(topic, defaultNumPartitions, defaultReplicationFactor))
val adminClient = Admin.create(props)
val newTopic = new NewTopic(topic, defaultNumPartitions, defaultReplicationFactor)
try adminClient.createTopics(Collections.singleton(newTopic)).all().get()
finally Utils.closeQuietly(adminClient, "AdminClient")
}
}

2
tests/kafkatest/benchmarks/core/benchmark_test.py

@ -173,7 +173,7 @@ class Benchmark(Test): @@ -173,7 +173,7 @@ class Benchmark(Test):
Return aggregate latency statistics.
(Under the hood, this simply runs EndToEndLatency.scala)
(Under the hood, this simply runs EndToEndLatency.java)
"""
client_version = KafkaVersion(client_version)
broker_version = KafkaVersion(broker_version)

9
tests/kafkatest/services/performance/end_to_end_latency.py

@ -17,7 +17,7 @@ import os @@ -17,7 +17,7 @@ import os
from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH
from kafkatest.version import get_version, V_3_4_0, DEV_BRANCH
@ -50,6 +50,7 @@ class EndToEndLatencyService(PerformanceService): @@ -50,6 +50,7 @@ class EndToEndLatencyService(PerformanceService):
root=EndToEndLatencyService.PERSISTENT_ROOT)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
self.version = ''
security_protocol = self.security_config.security_protocol
@ -73,6 +74,7 @@ class EndToEndLatencyService(PerformanceService): @@ -73,6 +74,7 @@ class EndToEndLatencyService(PerformanceService):
def start_cmd(self, node):
args = self.args.copy()
self.version = get_version(node)
args.update({
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
'config_file': EndToEndLatencyService.CONFIG_FILE,
@ -124,4 +126,7 @@ class EndToEndLatencyService(PerformanceService): @@ -124,4 +126,7 @@ class EndToEndLatencyService(PerformanceService):
self.results[idx-1] = results
def java_class_name(self):
return "kafka.tools.EndToEndLatency"
if self.version <= V_3_4_0:
return "kafka.tools.EndToEndLatency"
else:
return "org.apache.kafka.tools.EndToEndLatency"

224
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java

@ -0,0 +1,224 @@ @@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.List;
import java.util.Random;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* This class records the average end to end latency for a single message to travel through Kafka.
* Following are the required arguments
* <p> broker_list = location of the bootstrap broker for both the producer and the consumer </p>
* <p> topic = topic name used by both the producer and the consumer to send/receive messages </p>
* <p> num_messages = # messages to send </p>
* <p> producer_acks = See ProducerConfig.ACKS_DOC </p>
* <p> message_size_bytes = size of each message in bytes </p>
*
* <p> e.g. [localhost:9092 test 10000 1 20] </p>
*/
public class EndToEndLatency {
private final static long POLL_TIMEOUT_MS = 60000;
private final static short DEFAULT_REPLICATION_FACTOR = 1;
private final static int DEFAULT_NUM_PARTITIONS = 1;
public static void main(String... args) {
Exit.exit(mainNoExit(args));
}
static int mainNoExit(String... args) {
try {
execute(args);
return 0;
} catch (TerseException e) {
System.err.println(e.getMessage());
return 1;
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
return 1;
}
}
// Visible for testing
static void execute(String... args) throws Exception {
if (args.length != 5 && args.length != 6) {
throw new TerseException("USAGE: java " + EndToEndLatency.class.getName()
+ " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file");
}
String brokers = args[0];
String topic = args[1];
int numMessages = Integer.parseInt(args[2]);
String acks = args[3];
int messageSizeBytes = Integer.parseInt(args[4]);
Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
if (!Arrays.asList("1", "all").contains(acks)) {
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
}
try (KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(propertiesFile, brokers);
KafkaProducer<byte[], byte[]> producer = createKafkaProducer(propertiesFile, brokers, acks)) {
if (!consumer.listTopics().containsKey(topic)) {
createTopic(propertiesFile, brokers, topic);
}
setupConsumer(topic, consumer);
double totalTime = 0.0;
long[] latencies = new long[numMessages];
Random random = new Random(0);
for (int i = 0; i < numMessages; i++) {
byte[] message = randomBytesOfLen(random, messageSizeBytes);
long begin = System.nanoTime();
//Send message (of random bytes) synchronously then immediately poll for it
producer.send(new ProducerRecord<>(topic, message)).get();
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
long elapsed = System.nanoTime() - begin;
validate(consumer, message, records);
//Report progress
if (i % 1000 == 0)
System.out.println(i + "\t" + elapsed / 1000.0 / 1000.0);
totalTime += elapsed;
latencies[i] = elapsed / 1000 / 1000;
}
printResults(numMessages, totalTime, latencies);
consumer.commitSync();
}
}
// Visible for testing
static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) {
if (records.isEmpty()) {
consumer.commitSync();
throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])");
}
//Check result matches the original record
String sent = new String(message, StandardCharsets.UTF_8);
String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8);
if (!read.equals(sent)) {
consumer.commitSync();
throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]");
}
//Check we only got the one message
if (records.count() != 1) {
int count = records.count();
consumer.commitSync();
throw new RuntimeException("Only one result was expected during this test. We found [" + count + "]");
}
}
private static void setupConsumer(String topic, KafkaConsumer<byte[], byte[]> consumer) {
List<TopicPartition> topicPartitions = consumer
.partitionsFor(topic)
.stream()
.map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toList());
consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);
consumer.assignment().forEach(consumer::position);
}
private static void printResults(int numMessages, double totalTime, long[] latencies) {
System.out.printf("Avg latency: %.4f ms%n", totalTime / numMessages / 1000.0 / 1000.0);
Arrays.sort(latencies);
int p50 = (int) latencies[(int) (latencies.length * 0.5)];
int p99 = (int) latencies[(int) (latencies.length * 0.99)];
int p999 = (int) latencies[(int) (latencies.length * 0.999)];
System.out.printf("Percentiles: 50th = %d, 99th = %d, 99.9th = %d%n", p50, p99, p999);
}
private static byte[] randomBytesOfLen(Random random, int length) {
byte[] randomBytes = new byte[length];
Arrays.fill(randomBytes, Integer.valueOf(random.nextInt(26) + 65).byteValue());
return randomBytes;
}
private static void createTopic(Optional<String> propertiesFile, String brokers, String topic) throws IOException {
System.out.printf("Topic \"%s\" does not exist. "
+ "Will create topic with %d partition(s) and replication factor = %d%n",
topic, DEFAULT_NUM_PARTITIONS, DEFAULT_REPLICATION_FACTOR);
Properties adminProps = loadPropsWithBootstrapServers(propertiesFile, brokers);
Admin adminClient = Admin.create(adminProps);
NewTopic newTopic = new NewTopic(topic, DEFAULT_NUM_PARTITIONS, DEFAULT_REPLICATION_FACTOR);
try {
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
} catch (ExecutionException | InterruptedException e) {
System.out.printf("Creation of topic %s failed%n", topic);
throw new RuntimeException(e);
} finally {
Utils.closeQuietly(adminClient, "AdminClient");
}
}
private static Properties loadPropsWithBootstrapServers(Optional<String> propertiesFile, String brokers) throws IOException {
Properties properties = propertiesFile.isPresent() ? Utils.loadProps(propertiesFile.get()) : new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
return properties;
}
private static KafkaConsumer<byte[], byte[]> createKafkaConsumer(Optional<String> propsFile, String brokers) throws IOException {
Properties consumerProps = loadPropsWithBootstrapServers(propsFile, brokers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0"); //ensure we have no temporal batching
return new KafkaConsumer<>(consumerProps);
}
private static KafkaProducer<byte[], byte[]> createKafkaProducer(Optional<String> propsFile, String brokers, String acks) throws IOException {
Properties producerProps = loadPropsWithBootstrapServers(propsFile, brokers);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0"); //ensure writes are synchronous
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
producerProps.put(ProducerConfig.ACKS_CONFIG, acks);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(producerProps);
}
}

100
tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java

@ -0,0 +1,100 @@ @@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class EndToEndLatencyTest {
@Mock
KafkaConsumer<byte[], byte[]> consumer;
@Mock
ConsumerRecords<byte[], byte[]> records;
@Test
public void shouldFailWhenSuppliedUnexpectedArgs() {
String[] args = new String[] {"localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random"};
assertThrows(TerseException.class, () -> EndToEndLatency.execute(args));
}
@Test
public void shouldFailWhenProducerAcksAreNotSynchronised() {
String[] args = new String[] {"localhost:9092", "test", "10000", "0", "200"};
assertThrows(IllegalArgumentException.class, () -> EndToEndLatency.execute(args));
}
@Test
public void shouldFailWhenConsumerRecordsIsEmpty() {
when(records.isEmpty()).thenReturn(true);
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, new byte[0], records));
}
@Test
@SuppressWarnings("unchecked")
public void shouldFailWhenSentIsNotEqualToReceived() {
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
when(records.isEmpty()).thenReturn(false);
when(records.iterator()).thenReturn(iterator);
when(iterator.next()).thenReturn(record);
when(record.value()).thenReturn("kafkab".getBytes(StandardCharsets.UTF_8));
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records));
}
@Test
@SuppressWarnings("unchecked")
public void shouldFailWhenReceivedMoreThanOneRecord() {
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
when(records.isEmpty()).thenReturn(false);
when(records.iterator()).thenReturn(iterator);
when(iterator.next()).thenReturn(record);
when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8));
when(records.count()).thenReturn(2);
assertThrows(RuntimeException.class, () -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records));
}
@Test
@SuppressWarnings("unchecked")
public void shouldPassInValidation() {
Iterator<ConsumerRecord<byte[], byte[]>> iterator = mock(Iterator.class);
ConsumerRecord<byte[], byte[]> record = mock(ConsumerRecord.class);
when(records.isEmpty()).thenReturn(false);
when(records.iterator()).thenReturn(iterator);
when(iterator.next()).thenReturn(record);
when(record.value()).thenReturn("kafkaa".getBytes(StandardCharsets.UTF_8));
when(records.count()).thenReturn(1);
assertDoesNotThrow(() -> EndToEndLatency.validate(consumer, "kafkaa".getBytes(StandardCharsets.UTF_8), records));
}
}
Loading…
Cancel
Save