Browse Source

KAFKA-9274: Gracefully handle timeout exception (#8060)

1. Delay the initialization (producer.initTxn) from construction to maybeInitialize; if it times out we just swallow and retry in the next iteration.

2. If completeRestoration (consumer.committed) times out, just swallow and retry in the next iteration.

3. For other calls (producer.partitionsFor, producer.commitTxn, consumer.commit), treat the timeout exception as fatal.

Reviewers: Matthias J. Sax <matthias@confluent.io>
pull/8125/head
Guozhang Wang 5 years ago committed by GitHub
parent
commit
d8756e81c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  2. 7
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
  3. 33
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
  4. 13
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  5. 9
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  6. 25
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
  7. 4
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
  8. 3
      streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java

14
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

@ -42,6 +42,7 @@ import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse; import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupRequest;
@ -749,7 +750,7 @@ public class KafkaProducerTest {
public void testInitTransactionTimeout() { public void testInitTransactionTimeout() {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 500);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime(1); Time time = new MockTime(1);
@ -761,7 +762,18 @@ public class KafkaProducerTest {
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) { new StringSerializer(), metadata, client, null, time)) {
client.prepareResponse(
request -> request instanceof FindCoordinatorRequest &&
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
assertThrows(TimeoutException.class, producer::initTransactions); assertThrows(TimeoutException.class, producer::initTransactions);
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
// retry initialization should work
producer.initTransactions();
} }
} }

7
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java

@ -47,6 +47,13 @@ public interface RecordCollector extends AutoCloseable {
void commit(final Map<TopicPartition, OffsetAndMetadata> offsets); void commit(final Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* Initialize the internal {@link Producer}; note this function should be made idempotent
*
* @throws org.apache.kafka.common.errors.TimeoutException if producer initializing txn id timed out
*/
void initialize();
/** /**
* Flush the internal {@link Producer}. * Flush the internal {@link Producer}.
*/ */

33
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
@ -70,6 +71,7 @@ public class RecordCollectorImpl implements RecordCollector {
// used when eosEnabled is true only // used when eosEnabled is true only
private boolean transactionInFlight = false; private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
private Producer<byte[], byte[]> producer; private Producer<byte[], byte[]> producer;
private volatile KafkaException sendException; private volatile KafkaException sendException;
@ -95,24 +97,30 @@ public class RecordCollectorImpl implements RecordCollector {
this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), streamsMetrics); this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), streamsMetrics);
producer = producerSupplier.get(taskId); producer = producerSupplier.get(taskId);
}
@Override
public void initialize() {
maybeInitTxns(); maybeInitTxns();
} }
private void maybeInitTxns() { private void maybeInitTxns() {
if (eosEnabled) { if (eosEnabled && !transactionInitialized) {
// initialize transactions if eos is turned on, which will block if the previous transaction has not // initialize transactions if eos is turned on, which will block if the previous transaction has not
// completed yet; do not start the first transaction until the topology has been initialized later // completed yet; do not start the first transaction until the topology has been initialized later
try { try {
producer.initTransactions(); producer.initTransactions();
transactionInitialized = true;
} catch (final TimeoutException exception) { } catch (final TimeoutException exception) {
final String errorMessage = "Timeout exception caught when initializing transactions for task " + taskId + ". " + log.warn("Timeout exception caught when initializing transactions for task {}. " +
"\nThe broker is either slow or in bad state (like not having enough replicas) in responding to the request, " + "\nThe broker is either slow or in bad state (like not having enough replicas) in responding to the request, " +
"or the connection to broker was interrupted sending the request or receiving the response. " + "or the connection to broker was interrupted sending the request or receiving the response. " +
"\n Consider overwriting `max.block.ms` to a larger value to avoid timeout errors"; "Would retry initializing the task in the next loop." +
"\nConsider overwriting producer config {} to a larger value to avoid timeout errors",
ProducerConfig.MAX_BLOCK_MS_CONFIG, taskId);
// TODO K9113: we do NOT try to handle timeout exception here but throw it as a fatal error throw exception;
throw new StreamsException(errorMessage, exception);
} catch (final KafkaException exception) { } catch (final KafkaException exception) {
throw new StreamsException("Error encountered while initializing transactions for task " + taskId, exception); throw new StreamsException("Error encountered while initializing transactions for task " + taskId, exception);
} }
@ -163,7 +171,7 @@ public class RecordCollectorImpl implements RecordCollector {
} catch (final ProducerFencedException error) { } catch (final ProducerFencedException error) {
throw new TaskMigratedException(taskId, "Producer get fenced trying to commit a transaction", error); throw new TaskMigratedException(taskId, "Producer get fenced trying to commit a transaction", error);
} catch (final TimeoutException error) { } catch (final TimeoutException error) {
// TODO K9113: currently handle timeout exception as a fatal error, should discuss whether we want to handle it // TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level
throw new StreamsException("Timed out while committing transaction via producer for task " + taskId, error); throw new StreamsException("Timed out while committing transaction via producer for task " + taskId, error);
} catch (final KafkaException error) { } catch (final KafkaException error) {
throw new StreamsException("Error encountered sending offsets and committing transaction " + throw new StreamsException("Error encountered sending offsets and committing transaction " +
@ -176,7 +184,7 @@ public class RecordCollectorImpl implements RecordCollector {
throw new TaskMigratedException(taskId, "Consumer committing offsets failed, " + throw new TaskMigratedException(taskId, "Consumer committing offsets failed, " +
"indicating the corresponding thread is no longer part of the group.", error); "indicating the corresponding thread is no longer part of the group.", error);
} catch (final TimeoutException error) { } catch (final TimeoutException error) {
// TODO K9113: currently handle timeout exception as a fatal error // TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level
throw new StreamsException("Timed out while committing offsets via consumer for task " + taskId, error); throw new StreamsException("Timed out while committing offsets via consumer for task " + taskId, error);
} catch (final KafkaException error) { } catch (final KafkaException error) {
throw new StreamsException("Error encountered committing offsets via consumer for task " + taskId, error); throw new StreamsException("Error encountered committing offsets via consumer for task " + taskId, error);
@ -244,9 +252,16 @@ public class RecordCollectorImpl implements RecordCollector {
final StreamPartitioner<? super K, ? super V> partitioner) { final StreamPartitioner<? super K, ? super V> partitioner) {
final Integer partition; final Integer partition;
// TODO K9113: we need to decide how to handle exceptions from partitionsFor
if (partitioner != null) { if (partitioner != null) {
final List<PartitionInfo> partitions = producer.partitionsFor(topic); final List<PartitionInfo> partitions;
try {
partitions = producer.partitionsFor(topic);
} catch (final KafkaException e) {
// here we cannot drop the message on the floor even if it is a transient timeout exception,
// so we treat everything the same as a fatal exception
throw new StreamsException("Could not determine the number of partitions for topic '" + topic +
"' for task " + taskId + " due to " + e.toString());
}
if (partitions.size() > 0) { if (partitions.size() > 0) {
partition = partitioner.partition(topic, key, value, partitions.size()); partition = partitioner.partition(topic, key, value, partitions.size());
} else { } else {

13
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals; package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
@ -186,11 +187,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
/** /**
* @throws LockException could happen when multi-threads within the single instance, could retry * @throws LockException could happen when multi-threads within the single instance, could retry
* @throws TimeoutException if initializing record collector timed out
* @throws StreamsException fatal error, should close the thread * @throws StreamsException fatal error, should close the thread
*/ */
@Override @Override
public void initializeIfNeeded() { public void initializeIfNeeded() {
if (state() == State.CREATED) { if (state() == State.CREATED) {
recordCollector.initialize();
StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext);
transitionTo(State.RESTORING); transitionTo(State.RESTORING);
@ -199,6 +203,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
} }
} }
/**
* @throws TimeoutException if fetching committed offsets timed out
*/
@Override @Override
public void completeRestoration() { public void completeRestoration() {
if (state() == State.RESTORING) { if (state() == State.RESTORING) {
@ -612,6 +619,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
.filter(e -> e.getValue() != null) .filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
initializeTaskTime(offsetsAndMetadata); initializeTaskTime(offsetsAndMetadata);
} catch (final TimeoutException e) {
log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." +
"\nConsider overwriting consumer config {} to a larger value to avoid timeout errors",
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
throw e;
} catch (final KafkaException e) { } catch (final KafkaException e) {
throw new StreamsException(format("task [%s] Failed to initialize offsets for %s", id, partitions), e); throw new StreamsException(format("task [%s] Failed to initialize offsets for %s", id, partitions), e);
} }

9
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
@ -214,7 +215,7 @@ public class TaskManager {
if (task.state() == CREATED) { if (task.state() == CREATED) {
try { try {
task.initializeIfNeeded(); task.initializeIfNeeded();
} catch (final LockException e) { } catch (final LockException | TimeoutException e) {
// it is possible that if there are multiple threads within the instance that one thread // it is possible that if there are multiple threads within the instance that one thread
// trying to grab the task from the other, while the other has not released the lock since // trying to grab the task from the other, while the other has not released the lock since
// it did not participate in the rebalance. In this case we can just retry in the next iteration // it did not participate in the rebalance. In this case we can just retry in the next iteration
@ -232,7 +233,13 @@ public class TaskManager {
final Set<TopicPartition> restored = changelogReader.completedChangelogs(); final Set<TopicPartition> restored = changelogReader.completedChangelogs();
for (final Task task : restoringTasks) { for (final Task task : restoringTasks) {
if (restored.containsAll(task.changelogPartitions())) { if (restored.containsAll(task.changelogPartitions())) {
try {
task.completeRestoration(); task.completeRestoration();
} catch (final TimeoutException e) {
log.debug("Cloud complete restoration for {} due to {}; will retry", task.id(), e.toString());
allRunning = false;
}
} else { } else {
// we found a restoring task that isn't done restoring, which is evidence that // we found a restoring task that isn't done restoring, which is evidence that
// not all tasks are running // not all tasks are running

25
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

@ -454,13 +454,12 @@ public class RecordCollectorTest {
} }
@Test @Test
public void shouldThrowStreamsExceptionOnEOSInitializeTimeout() { public void shouldRethrowOnEOSInitializeTimeout() {
final KafkaException exception = new TimeoutException("KABOOM!"); final KafkaException exception = new TimeoutException("KABOOM!");
final Properties props = StreamsTestUtils.getStreamsConfig("test"); final Properties props = StreamsTestUtils.getStreamsConfig("test");
props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final StreamsException thrown = assertThrows(StreamsException.class, () -> final RecordCollector recordCollector = new RecordCollectorImpl(
new RecordCollectorImpl(
taskId, taskId,
new StreamsConfig(props), new StreamsConfig(props),
logContext, logContext,
@ -472,9 +471,10 @@ public class RecordCollectorTest {
throw exception; throw exception;
} }
} }
)
); );
assertEquals(exception, thrown.getCause());
final TimeoutException thrown = assertThrows(TimeoutException.class, recordCollector::initialize);
assertEquals(exception, thrown);
} }
@Test @Test
@ -483,8 +483,7 @@ public class RecordCollectorTest {
final Properties props = StreamsTestUtils.getStreamsConfig("test"); final Properties props = StreamsTestUtils.getStreamsConfig("test");
props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final StreamsException thrown = assertThrows(StreamsException.class, () -> final RecordCollector recordCollector = new RecordCollectorImpl(
new RecordCollectorImpl(
taskId, taskId,
new StreamsConfig(props), new StreamsConfig(props),
logContext, logContext,
@ -496,8 +495,9 @@ public class RecordCollectorTest {
throw exception; throw exception;
} }
} }
)
); );
final StreamsException thrown = assertThrows(StreamsException.class, recordCollector::initialize);
assertEquals(exception, thrown.getCause()); assertEquals(exception, thrown.getCause());
} }
@ -625,6 +625,7 @@ public class RecordCollectorTest {
} }
} }
); );
collector.initialize();
assertThrows(TaskMigratedException.class, () -> collector.commit(null)); assertThrows(TaskMigratedException.class, () -> collector.commit(null));
} }
@ -646,6 +647,7 @@ public class RecordCollectorTest {
} }
} }
); );
collector.initialize();
assertThrows(TaskMigratedException.class, () -> collector.commit(Collections.emptyMap())); assertThrows(TaskMigratedException.class, () -> collector.commit(Collections.emptyMap()));
} }
@ -688,6 +690,7 @@ public class RecordCollectorTest {
} }
} }
); );
collector.initialize();
assertThrows(StreamsException.class, () -> collector.commit(null)); assertThrows(StreamsException.class, () -> collector.commit(null));
} }
@ -709,6 +712,7 @@ public class RecordCollectorTest {
} }
} }
); );
collector.initialize();
assertThrows(StreamsException.class, () -> collector.commit(Collections.emptyMap())); assertThrows(StreamsException.class, () -> collector.commit(Collections.emptyMap()));
} }
@ -780,7 +784,7 @@ public class RecordCollectorTest {
} }
} }
); );
collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
collector.commit(Collections.emptyMap()); collector.commit(Collections.emptyMap());
@ -807,7 +811,7 @@ public class RecordCollectorTest {
} }
} }
); );
collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::close); final StreamsException thrown = assertThrows(StreamsException.class, collector::close);
@ -831,6 +835,7 @@ public class RecordCollectorTest {
} }
} }
); );
collector.initialize();
// this call is to begin an inflight txn // this call is to begin an inflight txn
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);

4
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -712,10 +712,6 @@ public class StreamThreadTest {
thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap()); thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
for (final Task task : thread.activeTasks()) {
assertTrue(((MockProducer) ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()).transactionInitialized());
}
thread.shutdown(); thread.shutdown();
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD, () -> thread.state() == StreamThread.State.DEAD,

3
streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java

@ -76,6 +76,9 @@ public class MockRecordCollector implements RecordCollector {
headers)); headers));
} }
@Override
public void initialize() {}
@Override @Override
public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) { public void commit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
committed.add(offsets); committed.add(offsets);

Loading…
Cancel
Save