Browse Source

KAFKA-6120: RecordCollector should not retry sending

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #4148 from mjsax/kafka-6120-recordCollector
pull/4148/merge
Matthias J. Sax 7 years ago committed by Guozhang Wang
parent
commit
2b5a21395c
  1. 101
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
  2. 55
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

101
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

@ -24,10 +24,10 @@ import org.apache.kafka.common.KafkaException; @@ -24,10 +24,10 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
@ -37,15 +37,15 @@ import java.util.List; @@ -37,15 +37,15 @@ import java.util.List;
import java.util.Map;
public class RecordCollectorImpl implements RecordCollector {
private static final int MAX_SEND_ATTEMPTS = 3;
private static final long SEND_RETRY_BACKOFF = 100L;
private final Logger log;
private final Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
"No more records will be sent and no more offsets will be recorded for this task.";
private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
private volatile KafkaException sendException;
public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId, final LogContext logContext) {
@ -93,43 +93,70 @@ public class RecordCollectorImpl implements RecordCollector { @@ -93,43 +93,70 @@ public class RecordCollectorImpl implements RecordCollector {
final ProducerRecord<byte[], byte[]> serializedRecord =
new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
// counting from 1 to make check further down more natural
// -> `if (attempt == MAX_SEND_ATTEMPTS)`
for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; ++attempt) {
try {
producer.send(serializedRecord, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception == null) {
if (sendException != null) {
return;
}
final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
} else {
if (sendException == null) {
log.error("Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
"No more records will be sent and no more offsets will be recorded for this task.",
key, value, timestamp, topic, exception);
if (exception instanceof ProducerFencedException) {
sendException = new ProducerFencedException(String.format("%sAbort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s",
logPrefix, key, value, timestamp, topic, exception.getMessage()));
} else {
sendException = new StreamsException(String.format("%sAbort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.",
logPrefix, key, value, timestamp, topic, exception), exception);
try {
producer.send(serializedRecord, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata,
final Exception exception) {
if (exception == null) {
if (sendException != null) {
return;
}
final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
} else {
if (sendException == null) {
if (exception instanceof ProducerFencedException) {
log.warn(LOG_MESSAGE, key, value, timestamp, topic, exception);
sendException = new ProducerFencedException(
String.format(EXCEPTION_MESSAGE,
logPrefix,
"producer got fenced",
key,
value,
timestamp,
topic,
exception.getMessage()));
} else {
String errorLogMessage = LOG_MESSAGE;
String errorMessage = EXCEPTION_MESSAGE;
if (exception instanceof RetriableException) {
errorLogMessage += PARAMETER_HINT;
errorMessage += PARAMETER_HINT;
}
log.error(errorLogMessage, key, value, timestamp, topic, exception);
sendException = new StreamsException(
String.format(errorMessage,
logPrefix,
"an error caught",
key,
value,
timestamp,
topic,
exception.getMessage()),
exception);
}
}
}
});
return;
} catch (final TimeoutException e) {
if (attempt == MAX_SEND_ATTEMPTS) {
throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
}
log.warn("Timeout exception caught when sending record to topic {}; retrying with {} attempt", topic, attempt);
Utils.sleep(SEND_RETRY_BACKOFF);
}
});
} catch (final TimeoutException e) {
log.error("Timeout exception caught when sending record to topic {}. " +
"This might happen if the producer cannot send data to the Kafka cluster and thus, " +
"its internal buffer fills up. " +
"You can increase producer parameter `max.block.ms` to increase this timeout.", topic);
throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic));
} catch (final Exception fatalException) {
throw new StreamsException(
String.format(EXCEPTION_MESSAGE,
logPrefix,
"an error caught",
key,
value,
timestamp,
topic,
fatalException.getMessage()),
fatalException);
}
}

55
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

@ -22,10 +22,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; @@ -22,10 +22,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
@ -38,9 +38,9 @@ import java.util.Collections; @@ -38,9 +38,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class RecordCollectorTest {
@ -124,47 +124,24 @@ public class RecordCollectorTest { @@ -124,47 +124,24 @@ public class RecordCollectorTest {
assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
}
@SuppressWarnings("unchecked")
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() {
final AtomicInteger attempt = new AtomicInteger(0);
final RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
if (attempt.getAndIncrement() == 0) {
throw new TimeoutException();
}
return super.send(record, callback);
}
},
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
assertEquals(Long.valueOf(0L), offset);
}
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionAfterMaxAttempts() {
public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
throw new TimeoutException();
throw new KafkaException();
}
},
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
@Test
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@ -177,11 +154,15 @@ public class RecordCollectorTest { @@ -177,11 +154,15 @@ public class RecordCollectorTest {
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
try {
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
fail("Should have thrown StreamsException");
} catch (final StreamsException expected) { /* ok */ }
}
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
@Test
public void shouldThrowStreamsExceptionOnFlushIfASendFailed() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@ -194,11 +175,15 @@ public class RecordCollectorTest { @@ -194,11 +175,15 @@ public class RecordCollectorTest {
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.flush();
try {
collector.flush();
fail("Should have thrown StreamsException");
} catch (final StreamsException expected) { /* ok */ }
}
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
@Test
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@ -211,7 +196,11 @@ public class RecordCollectorTest { @@ -211,7 +196,11 @@ public class RecordCollectorTest {
"test",
logContext);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.close();
try {
collector.close();
fail("Should have thrown StreamsException");
} catch (final StreamsException expected) { /* ok */ }
}
@SuppressWarnings("unchecked")

Loading…
Cancel
Save