diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index fe3e9d6481f..4164f8a5823 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AddOffsetsToTxnResponse; import org.apache.kafka.common.requests.EndTxnResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.JoinGroupRequest; @@ -749,7 +750,7 @@ public class KafkaProducerTest { public void testInitTransactionTimeout() { Map configs = new HashMap<>(); configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); - configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 500); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); Time time = new MockTime(1); @@ -761,7 +762,18 @@ public class KafkaProducerTest { try (Producer producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time)) { + client.prepareResponse( + request -> request instanceof FindCoordinatorRequest && + ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(), + FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); + assertThrows(TimeoutException.class, producer::initTransactions); + + client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1)); + client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); + + // retry initialization should work + producer.initTransactions(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index 78724f62d7e..5e8a0731591 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -47,6 +47,13 @@ public interface RecordCollector extends AutoCloseable { void commit(final Map offsets); + /** + * Initialize the internal {@link Producer}; note this function should be made idempotent + * + * @throws org.apache.kafka.common.errors.TimeoutException if producer initializing txn id timed out + */ + void initialize(); + /** * Flush the internal {@link Producer}. */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index a6399858fa7..e84197becc0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; @@ -70,6 +71,7 @@ public class RecordCollectorImpl implements RecordCollector { // used when eosEnabled is true only private boolean transactionInFlight = false; + private boolean transactionInitialized = false; private Producer producer; private volatile KafkaException sendException; @@ -95,24 +97,30 @@ public class RecordCollectorImpl implements RecordCollector { this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), streamsMetrics); producer = producerSupplier.get(taskId); + } + @Override + public void initialize() { maybeInitTxns(); } private void maybeInitTxns() { - if (eosEnabled) { + if (eosEnabled && !transactionInitialized) { // initialize transactions if eos is turned on, which will block if the previous transaction has not // completed yet; do not start the first transaction until the topology has been initialized later try { producer.initTransactions(); + + transactionInitialized = true; } catch (final TimeoutException exception) { - final String errorMessage = "Timeout exception caught when initializing transactions for task " + taskId + ". " + + log.warn("Timeout exception caught when initializing transactions for task {}. " + "\nThe broker is either slow or in bad state (like not having enough replicas) in responding to the request, " + "or the connection to broker was interrupted sending the request or receiving the response. " + - "\n Consider overwriting `max.block.ms` to a larger value to avoid timeout errors"; + "Would retry initializing the task in the next loop." + + "\nConsider overwriting producer config {} to a larger value to avoid timeout errors", + ProducerConfig.MAX_BLOCK_MS_CONFIG, taskId); - // TODO K9113: we do NOT try to handle timeout exception here but throw it as a fatal error - throw new StreamsException(errorMessage, exception); + throw exception; } catch (final KafkaException exception) { throw new StreamsException("Error encountered while initializing transactions for task " + taskId, exception); } @@ -163,7 +171,7 @@ public class RecordCollectorImpl implements RecordCollector { } catch (final ProducerFencedException error) { throw new TaskMigratedException(taskId, "Producer get fenced trying to commit a transaction", error); } catch (final TimeoutException error) { - // TODO K9113: currently handle timeout exception as a fatal error, should discuss whether we want to handle it + // TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level throw new StreamsException("Timed out while committing transaction via producer for task " + taskId, error); } catch (final KafkaException error) { throw new StreamsException("Error encountered sending offsets and committing transaction " + @@ -176,7 +184,7 @@ public class RecordCollectorImpl implements RecordCollector { throw new TaskMigratedException(taskId, "Consumer committing offsets failed, " + "indicating the corresponding thread is no longer part of the group.", error); } catch (final TimeoutException error) { - // TODO K9113: currently handle timeout exception as a fatal error + // TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level throw new StreamsException("Timed out while committing offsets via consumer for task " + taskId, error); } catch (final KafkaException error) { throw new StreamsException("Error encountered committing offsets via consumer for task " + taskId, error); @@ -244,9 +252,16 @@ public class RecordCollectorImpl implements RecordCollector { final StreamPartitioner partitioner) { final Integer partition; - // TODO K9113: we need to decide how to handle exceptions from partitionsFor if (partitioner != null) { - final List partitions = producer.partitionsFor(topic); + final List partitions; + try { + partitions = producer.partitionsFor(topic); + } catch (final KafkaException e) { + // here we cannot drop the message on the floor even if it is a transient timeout exception, + // so we treat everything the same as a fatal exception + throw new StreamsException("Could not determine the number of partitions for topic '" + topic + + "' for task " + taskId + " due to " + e.toString()); + } if (partitions.size() > 0) { partition = partitioner.partition(topic, key, value, partitions.size()); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 06832921345..13a3bc9b88b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; @@ -186,11 +187,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, /** * @throws LockException could happen when multi-threads within the single instance, could retry + * @throws TimeoutException if initializing record collector timed out * @throws StreamsException fatal error, should close the thread */ @Override public void initializeIfNeeded() { if (state() == State.CREATED) { + recordCollector.initialize(); + StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); transitionTo(State.RESTORING); @@ -199,6 +203,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } } + /** + * @throws TimeoutException if fetching committed offsets timed out + */ @Override public void completeRestoration() { if (state() == State.RESTORING) { @@ -612,6 +619,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, .filter(e -> e.getValue() != null) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); initializeTaskTime(offsetsAndMetadata); + } catch (final TimeoutException e) { + log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop." + + "\nConsider overwriting consumer config {} to a larger value to avoid timeout errors", + ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); + + throw e; } catch (final KafkaException e) { throw new StreamsException(format("task [%s] Failed to initialize offsets for %s", id, partitions), e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 4b151ce208d..49b673ba98a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; @@ -214,7 +215,7 @@ public class TaskManager { if (task.state() == CREATED) { try { task.initializeIfNeeded(); - } catch (final LockException e) { + } catch (final LockException | TimeoutException e) { // it is possible that if there are multiple threads within the instance that one thread // trying to grab the task from the other, while the other has not released the lock since // it did not participate in the rebalance. In this case we can just retry in the next iteration @@ -232,7 +233,13 @@ public class TaskManager { final Set restored = changelogReader.completedChangelogs(); for (final Task task : restoringTasks) { if (restored.containsAll(task.changelogPartitions())) { - task.completeRestoration(); + try { + task.completeRestoration(); + } catch (final TimeoutException e) { + log.debug("Cloud complete restoration for {} due to {}; will retry", task.id(), e.toString()); + + allRunning = false; + } } else { // we found a restoring task that isn't done restoring, which is evidence that // not all tasks are running diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 50f955d7e98..7d4f1320a2d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -454,27 +454,27 @@ public class RecordCollectorTest { } @Test - public void shouldThrowStreamsExceptionOnEOSInitializeTimeout() { + public void shouldRethrowOnEOSInitializeTimeout() { final KafkaException exception = new TimeoutException("KABOOM!"); final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - final StreamsException thrown = assertThrows(StreamsException.class, () -> - new RecordCollectorImpl( - taskId, - new StreamsConfig(props), - logContext, - streamsMetrics, - null, - id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { - @Override - public void initTransactions() { - throw exception; - } + final RecordCollector recordCollector = new RecordCollectorImpl( + taskId, + new StreamsConfig(props), + logContext, + streamsMetrics, + null, + id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public void initTransactions() { + throw exception; } - ) + } ); - assertEquals(exception, thrown.getCause()); + + final TimeoutException thrown = assertThrows(TimeoutException.class, recordCollector::initialize); + assertEquals(exception, thrown); } @Test @@ -483,21 +483,21 @@ public class RecordCollectorTest { final Properties props = StreamsTestUtils.getStreamsConfig("test"); props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - final StreamsException thrown = assertThrows(StreamsException.class, () -> - new RecordCollectorImpl( - taskId, - new StreamsConfig(props), - logContext, - streamsMetrics, - null, - id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { - @Override - public void initTransactions() { - throw exception; - } + final RecordCollector recordCollector = new RecordCollectorImpl( + taskId, + new StreamsConfig(props), + logContext, + streamsMetrics, + null, + id -> new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public void initTransactions() { + throw exception; } - ) + } ); + + final StreamsException thrown = assertThrows(StreamsException.class, recordCollector::initialize); assertEquals(exception, thrown.getCause()); } @@ -625,6 +625,7 @@ public class RecordCollectorTest { } } ); + collector.initialize(); assertThrows(TaskMigratedException.class, () -> collector.commit(null)); } @@ -646,6 +647,7 @@ public class RecordCollectorTest { } } ); + collector.initialize(); assertThrows(TaskMigratedException.class, () -> collector.commit(Collections.emptyMap())); } @@ -688,6 +690,7 @@ public class RecordCollectorTest { } } ); + collector.initialize(); assertThrows(StreamsException.class, () -> collector.commit(null)); } @@ -709,6 +712,7 @@ public class RecordCollectorTest { } } ); + collector.initialize(); assertThrows(StreamsException.class, () -> collector.commit(Collections.emptyMap())); } @@ -780,7 +784,7 @@ public class RecordCollectorTest { } } ); - + collector.initialize(); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); collector.commit(Collections.emptyMap()); @@ -807,7 +811,7 @@ public class RecordCollectorTest { } } ); - + collector.initialize(); collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); final StreamsException thrown = assertThrows(StreamsException.class, collector::close); @@ -831,6 +835,7 @@ public class RecordCollectorTest { } } ); + collector.initialize(); // this call is to begin an inflight txn collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 9d3052fe0d3..520718595c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -712,10 +712,6 @@ public class StreamThreadTest { thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap()); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - for (final Task task : thread.activeTasks()) { - assertTrue(((MockProducer) ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()).transactionInitialized()); - } - thread.shutdown(); TestUtils.waitForCondition( () -> thread.state() == StreamThread.State.DEAD, diff --git a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java index a0fdb72d4f2..e34fab12657 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java @@ -76,6 +76,9 @@ public class MockRecordCollector implements RecordCollector { headers)); } + @Override + public void initialize() {} + @Override public void commit(final Map offsets) { committed.add(offsets);