Browse Source

KAFKA-5959; Fix NPE in Sender.canRetry when idempotence is not enabled

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: tedyu <yuzhihong@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #3947 from apurvam/KAFKA-5959-npe-in-sender
pull/3864/merge
Apurva Mehta 7 years ago committed by Jason Gustafson
parent
commit
8f90fd6530
  1. 3
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  2. 30
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

3
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -636,7 +636,8 @@ public class Sender implements Runnable { @@ -636,7 +636,8 @@ public class Sender implements Runnable {
*/
private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
return batch.attempts() < this.retries &&
((response.error.exception() instanceof RetriableException) || transactionManager.canRetry(response, batch));
((response.error.exception() instanceof RetriableException) ||
(transactionManager != null && transactionManager.canRetry(response, batch)));
}
/**

30
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@ -500,6 +501,35 @@ public class SenderTest { @@ -500,6 +501,35 @@ public class SenderTest {
assertSendFailure(ClusterAuthorizationException.class);
}
@Test
public void testCanRetryWithoutIdempotence() throws Exception {
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
String id = client.requests().peek().destination();
Node node = new Node(Integer.parseInt(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue(client.hasInFlightRequests());
assertTrue("Client ready status should be true", client.isReady(node, 0L));
assertFalse(future.isDone());
client.respond(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
ProduceRequest request = (ProduceRequest) body;
assertFalse(request.isIdempotent());
return true;
}
}, produceResponse(tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0));
sender.run(time.milliseconds());
assertTrue(future.isDone());
try {
future.get();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TopicAuthorizationException);
}
}
@Test
public void testIdempotenceWithMultipleInflights() throws Exception {

Loading…
Cancel
Save