Browse Source

Revert "MINOR: Fixed ProducerPerformance still counting successful sending when sending failed (#13348)" (#13401)

This reverts commit 8e4c0d0b04.

Reviewers: Luke Chen <showuon@gmail.com>
pull/13404/head
Chia-Ping Tsai 2 years ago committed by GitHub
parent
commit
279c237632
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
  2. 45
      tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java

28
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java

@ -94,7 +94,7 @@ public class ProducerPerformance { @@ -94,7 +94,7 @@ public class ProducerPerformance {
}
Random random = new Random(0);
ProducerRecord<byte[], byte[]> record;
stats = new Stats(numRecords, 5000);
Stats stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
@ -113,7 +113,7 @@ public class ProducerPerformance { @@ -113,7 +113,7 @@ public class ProducerPerformance {
record = new ProducerRecord<>(topicName, payload);
long sendStartMs = System.currentTimeMillis();
cb = stats.nextCompletion(sendStartMs, payload.length);
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
currentTransactionSize++;
@ -164,10 +164,6 @@ public class ProducerPerformance { @@ -164,10 +164,6 @@ public class ProducerPerformance {
return new KafkaProducer<>(props);
}
Callback cb;
Stats stats;
static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
Random random) {
if (!payloadByteList.isEmpty()) {
@ -387,16 +383,9 @@ public class ProducerPerformance { @@ -387,16 +383,9 @@ public class ProducerPerformance {
}
}
public long totalCount() {
return this.count;
}
public long currentWindowCount() {
return this.windowCount;
}
public Callback nextCompletion(long start, int bytes) {
Callback cb = new PerfCallback(this.iteration, start, bytes, this);
public Callback nextCompletion(long start, int bytes, Stats stats) {
Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
this.iteration++;
return cb;
}
@ -465,12 +454,7 @@ public class ProducerPerformance { @@ -465,12 +454,7 @@ public class ProducerPerformance {
public void onCompletion(RecordMetadata metadata, Exception exception) {
long now = System.currentTimeMillis();
int latency = (int) (now - start);
// It will only be counted when the sending is successful, otherwise the number of sent records may be
// magically printed when the sending fails.
if (exception == null) {
this.stats.record(iteration, latency, bytes, now);
this.stats.iteration++;
}
this.stats.record(iteration, latency, bytes, now);
if (exception != null)
exception.printStackTrace();
}

45
tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@ -43,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -43,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -106,49 +104,6 @@ public class ProducerPerformanceTest { @@ -106,49 +104,6 @@ public class ProducerPerformanceTest {
verify(producerMock, times(1)).close();
}
@Test
public void testNumberOfSuccessfulSendAndClose() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, null);
return null;
}).when(producerMock).send(any(), any());
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);
verify(producerMock, times(10)).send(any(), any());
assertEquals(10, producerPerformanceSpy.stats.totalCount());
verify(producerMock, times(1)).close();
}
@Test
public void testNumberOfFailedSendAndClose() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, new AuthorizationException("not authorized."));
return null;
}).when(producerMock).send(any(), any());
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);
verify(producerMock, times(10)).send(any(), any());
assertEquals(0, producerPerformanceSpy.stats.currentWindowCount());
assertEquals(0, producerPerformanceSpy.stats.totalCount());
verify(producerMock, times(1)).close();
}
@Test
public void testUnexpectedArg() {
String[] args = new String[] {

Loading…
Cancel
Save