Browse Source

KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close (#8900)

If there's any pending data and we haven't flushed the producer when we abort a transaction, a KafkaException is returned for the previous send. This is a bit misleading, since the situation is not an unrecoverable error and so the Kafka Exception is really non-fatal. For now, we should just catch and swallow this in the RecordCollector (see also: KAFKA-10169)

The reason we ended up aborting an un-flushed transaction was due to the combination of
a. always aborting the ongoing transaction when any task is closed/revoked
b. only committing (and flushing) if at least one of the revoked tasks needs to be committed (regardless of whether any non-revoked tasks have data/transaction in flight)

Given the above, we can end up with an ongoing transaction that isn't committed since none of the revoked tasks have any data in the transaction. We then abort the transaction anyway, when those tasks are closed. So in addition to the above (swallowing this exception), we should avoid unnecessarily aborting data for tasks that haven't been revoked.

We can handle this by splitting the RecordCollector's close into a dirty and clean flavor: if dirty, we need to abort the transaction since it may be dirty due to the commit attempt failing. But if clean, we can skip aborting the transaction since we know that either we just committed and thus there is no ongoing transaction to abort, or else the transaction in flight contains no data from the tasks being closed

Note that this means we still abort the transaction any time a task is closed dirty, so we must close/reinitialize any active task with pending data (that was aborted).

In sum:

* hackily check the KafkaException message and swallow
* only abort the transaction during a dirty close
* refactor shutdown to make sure we don't closeClean a task whose data was actually aborted

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Boyang Chen <boyang@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/8903/merge
A. Sophie Blee-Goldman 4 years ago committed by GitHub
parent
commit
448e7d7f0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
  2. 9
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
  3. 38
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
  4. 13
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  5. 1
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  6. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
  7. 190
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  8. 8
      streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
  9. 352
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
  10. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
  11. 6
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
  12. 5
      streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java

4
streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java

@ -26,6 +26,10 @@ public class TaskMigratedException extends StreamsException { @@ -26,6 +26,10 @@ public class TaskMigratedException extends StreamsException {
private final static long serialVersionUID = 1L;
public TaskMigratedException(final String message) {
super(message + "; it means all tasks belonging to this thread should be migrated.");
}
public TaskMigratedException(final String message, final Throwable throwable) {
super(message + "; it means all tasks belonging to this thread should be migrated.", throwable);
}

9
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java

@ -57,9 +57,14 @@ public interface RecordCollector { @@ -57,9 +57,14 @@ public interface RecordCollector {
void flush();
/**
* Close the internal {@link Producer}.
* Clean close the internal {@link Producer}.
*/
void close();
void closeClean();
/**
* Dirty close the internal {@link Producer}.
*/
void closeDirty();
/**
* The last acked offsets from the internal {@link Producer}.

38
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
@ -61,7 +62,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -61,7 +62,7 @@ public class RecordCollectorImpl implements RecordCollector {
private final boolean eosEnabled;
private final Map<TopicPartition, Long> offsets;
private volatile KafkaException sendException;
private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);
/**
* @throws StreamsException fatal error that should cause the thread to die (from producer.initTxn)
@ -170,7 +171,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -170,7 +171,7 @@ public class RecordCollectorImpl implements RecordCollector {
streamsProducer.send(serializedRecord, (metadata, exception) -> {
// if there's already an exception record, skip logging offsets or new exceptions
if (sendException != null) {
if (sendException.get() != null) {
return;
}
@ -195,11 +196,11 @@ public class RecordCollectorImpl implements RecordCollector { @@ -195,11 +196,11 @@ public class RecordCollectorImpl implements RecordCollector {
if (isFatalException(exception)) {
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
sendException = new StreamsException(errorMessage, exception);
sendException.set(new StreamsException(errorMessage, exception));
} else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " +
"indicating the task may be migrated out";
sendException = new TaskMigratedException(errorMessage, exception);
sendException.set(new TaskMigratedException(errorMessage, exception));
} else {
if (exception instanceof RetriableException) {
errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
@ -210,7 +211,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -210,7 +211,7 @@ public class RecordCollectorImpl implements RecordCollector {
if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
sendException = new StreamsException(errorMessage, exception);
sendException.set(new StreamsException(errorMessage, exception));
} else {
errorMessage += "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.";
droppedRecordsSensor.record();
@ -250,10 +251,26 @@ public class RecordCollectorImpl implements RecordCollector { @@ -250,10 +251,26 @@ public class RecordCollectorImpl implements RecordCollector {
* @throws TaskMigratedException recoverable error that would cause the task to be removed
*/
@Override
public void close() {
log.info("Closing record collector");
public void closeClean() {
log.info("Closing record collector clean");
// No need to abort transaction during a clean close: either we have successfully committed the ongoing
// transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked
// tasks had any data in the current transaction and therefore there is no need to commit or abort it.
checkForException();
}
/**
* @throws StreamsException fatal error that should cause the thread to die
* @throws TaskMigratedException recoverable error that would cause the task to be removed
*/
@Override
public void closeDirty() {
log.info("Closing record collector dirty");
if (eosEnabled) {
// We may be closing dirty because the commit failed, so we must abort the transaction to be safe
streamsProducer.abortTransaction();
}
@ -266,8 +283,11 @@ public class RecordCollectorImpl implements RecordCollector { @@ -266,8 +283,11 @@ public class RecordCollectorImpl implements RecordCollector {
}
private void checkForException() {
if (sendException != null) {
throw sendException;
final KafkaException exception = sendException.get();
if (exception != null) {
sendException.set(null);
throw exception;
}
}

13
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -487,7 +487,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -487,7 +487,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
switch (state()) {
case SUSPENDED:
stateMgr.recycle();
recordCollector.close();
recordCollector.closeClean();
break;
@ -520,9 +520,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -520,9 +520,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
*/
private void close(final boolean clean) {
if (clean && commitNeeded) {
// It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
// closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
+ " commit and should close as dirty instead");
throw new StreamsException("Tried to close dirty task as clean");
throw new TaskMigratedException("Tried to close dirty task as clean");
}
switch (state()) {
case SUSPENDED:
@ -542,7 +544,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -542,7 +544,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
"state manager close",
log);
TaskManager.executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
TaskManager.executeAndMaybeSwallow(
clean,
clean ? recordCollector::closeClean : recordCollector::closeDirty,
"record collector close",
log
);
break;

1
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -340,7 +340,6 @@ public class StreamThread extends Thread { @@ -340,7 +340,6 @@ public class StreamThread extends Thread {
changelogReader,
processId,
logPrefix,
streamsMetrics,
activeTaskCreator,
standbyTaskCreator,
builder,

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java

@ -152,9 +152,9 @@ public class StreamsMetadataState { @@ -152,9 +152,9 @@ public class StreamsMetadataState {
}
if (globalStores.contains(storeName)) {
// global stores are on every node. if we dont' have the host info
// global stores are on every node. if we don't have the host info
// for this host then just pick the first metadata
if (thisHost == UNKNOWN_HOST) {
if (thisHost.equals(UNKNOWN_HOST)) {
return allMetadata.get(0);
}
return localMetadata;
@ -221,9 +221,9 @@ public class StreamsMetadataState { @@ -221,9 +221,9 @@ public class StreamsMetadataState {
}
if (globalStores.contains(storeName)) {
// global stores are on every node. if we dont' have the host info
// global stores are on every node. if we don't have the host info
// for this host then just pick the first metadata
if (thisHost == UNKNOWN_HOST) {
if (thisHost.equals(UNKNOWN_HOST)) {
return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
}
return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1);
@ -265,7 +265,7 @@ public class StreamsMetadataState { @@ -265,7 +265,7 @@ public class StreamsMetadataState {
if (globalStores.contains(storeName)) {
// global stores are on every node. if we don't have the host info
// for this host then just pick the first metadata
if (thisHost == UNKNOWN_HOST) {
if (thisHost.equals(UNKNOWN_HOST)) {
return allMetadata.get(0);
}
return localMetadata;

190
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -35,7 +35,6 @@ import org.apache.kafka.streams.errors.StreamsException; @@ -35,7 +35,6 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
@ -69,7 +68,6 @@ public class TaskManager { @@ -69,7 +68,6 @@ public class TaskManager {
private final ChangelogReader changelogReader;
private final UUID processId;
private final String logPrefix;
private final StreamsMetricsImpl streamsMetrics;
private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
private final InternalTopologyBuilder builder;
@ -93,7 +91,6 @@ public class TaskManager { @@ -93,7 +91,6 @@ public class TaskManager {
TaskManager(final ChangelogReader changelogReader,
final UUID processId,
final String logPrefix,
final StreamsMetricsImpl streamsMetrics,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator,
final InternalTopologyBuilder builder,
@ -103,7 +100,6 @@ public class TaskManager { @@ -103,7 +100,6 @@ public class TaskManager {
this.changelogReader = changelogReader;
this.processId = processId;
this.logPrefix = logPrefix;
this.streamsMetrics = streamsMetrics;
this.activeTaskCreator = activeTaskCreator;
this.standbyTaskCreator = standbyTaskCreator;
this.builder = builder;
@ -679,92 +675,166 @@ public class TaskManager { @@ -679,92 +675,166 @@ public class TaskManager {
void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
final Set<Task> tasksToClose = new HashSet<>();
final Set<Task> tasksToCloseDirty = new HashSet<>();
tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, firstException));
tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, firstException));
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task);
}
for (final Task task : activeTaskIterable()) {
executeAndMaybeSwallow(
clean,
() -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
);
}
executeAndMaybeSwallow(
clean,
activeTaskCreator::closeThreadProducerIfNeeded,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing thread producer.", e)
);
tasks.clear();
// this should be called after closing all tasks, to make sure we unlock the task dir for tasks that may
// have still been in CREATED at the time of shutdown, since Task#close will not do so
executeAndMaybeSwallow(
clean,
this::releaseLockedUnassignedTaskDirectories,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e)
);
final RuntimeException fatalException = firstException.get();
if (fatalException != null) {
throw new RuntimeException("Unexpected exception while closing task", fatalException);
}
}
// Returns the set of active tasks that must be closed dirty
private Collection<Task> tryCloseCleanAllActiveTasks(final boolean clean,
final AtomicReference<RuntimeException> firstException) {
if (!clean) {
return activeTaskIterable();
}
final Set<Task> tasksToCloseDirty = new HashSet<>();
final Set<Task> tasksToCloseClean = new HashSet<>();
final Set<Task> tasksToCommit = new HashSet<>();
final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
for (final Task task : tasks.values()) {
if (clean) {
try {
task.suspend();
if (task.commitNeeded()) {
tasksToCommit.add(task);
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
if (task.isActive()) {
consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
}
}
tasksToClose.add(task);
} catch (final TaskMigratedException e) {
// just ignore the exception as it doesn't matter during shutdown
closeTaskDirty(task);
} catch (final RuntimeException e) {
firstException.compareAndSet(null, e);
closeTaskDirty(task);
for (final Task task : activeTaskIterable()) {
try {
task.suspend();
if (task.commitNeeded()) {
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
tasksToCommit.add(task);
consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
}
} else {
closeTaskDirty(task);
tasksToCloseClean.add(task);
} catch (final TaskMigratedException e) {
// just ignore the exception as it doesn't matter during shutdown
tasksToCloseDirty.add(task);
} catch (final RuntimeException e) {
firstException.compareAndSet(null, e);
tasksToCloseDirty.add(task);
}
}
try {
if (clean) {
// If any active tasks can't be committed, none of them can be, and all that need a commit must be closed dirty
if (!tasksToCloseDirty.isEmpty()) {
tasksToCloseClean.removeAll(tasksToCommit);
tasksToCloseDirty.addAll(tasksToCommit);
} else {
try {
commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
for (final Task task : tasksToCommit) {
try {
task.postCommit();
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
firstException.compareAndSet(null, e);
tasksToCloseDirty.add(task);
tasksToCloseClean.remove(task);
}
}
} catch (final RuntimeException e) {
log.error("Exception caught while committing tasks during shutdown", e);
firstException.compareAndSet(null, e);
// If the commit fails, everyone who participated in it must be closed dirty
tasksToCloseClean.removeAll(tasksToCommit);
tasksToCloseDirty.addAll(tasksToCommit);
}
} catch (final RuntimeException e) {
log.error("Exception caught while committing tasks during shutdown", e);
firstException.compareAndSet(null, e);
}
for (final Task task : tasksToClose) {
for (final Task task : tasksToCloseClean) {
try {
completeTaskCloseClean(task);
} catch (final RuntimeException e) {
log.error("Exception caught while clean-closing task " + task.id(), e);
firstException.compareAndSet(null, e);
closeTaskDirty(task);
tasksToCloseDirty.add(task);
}
}
for (final Task task : tasks.values()) {
if (task.isActive()) {
executeAndMaybeSwallow(
clean,
() -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
);
}
}
return tasksToCloseDirty;
}
tasks.clear();
// Returns the set of standby tasks that must be closed dirty
private Collection<Task> tryCloseCleanAllStandbyTasks(final boolean clean,
final AtomicReference<RuntimeException> firstException) {
if (!clean) {
return standbyTaskIterable();
}
final Set<Task> tasksToCloseDirty = new HashSet<>();
final Set<Task> tasksToCloseClean = new HashSet<>();
final Set<Task> tasksToCommit = new HashSet<>();
executeAndMaybeSwallow(
clean,
activeTaskCreator::closeThreadProducerIfNeeded,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing thread producer.", e)
);
for (final Task task : standbyTaskIterable()) {
try {
task.suspend();
if (task.commitNeeded()) {
task.prepareCommit();
tasksToCommit.add(task);
}
tasksToCloseClean.add(task);
} catch (final TaskMigratedException e) {
// just ignore the exception as it doesn't matter during shutdown
tasksToCloseDirty.add(task);
} catch (final RuntimeException e) {
firstException.compareAndSet(null, e);
tasksToCloseDirty.add(task);
}
}
try {
// this should be called after closing all tasks, to make sure we unlock the task dir for tasks that may
// have still been in CREATED at the time of shutdown, since Task#close will not do so
releaseLockedUnassignedTaskDirectories();
} catch (final RuntimeException e) {
firstException.compareAndSet(null, e);
for (final Task task : tasksToCommit) {
try {
task.postCommit();
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing standby task " + task.id(), e);
firstException.compareAndSet(null, e);
tasksToCloseDirty.add(task);
tasksToCloseClean.remove(task);
}
}
final RuntimeException fatalException = firstException.get();
if (fatalException != null) {
throw new RuntimeException("Unexpected exception while closing task", fatalException);
for (final Task task : tasksToCloseClean) {
try {
completeTaskCloseClean(task);
} catch (final RuntimeException e) {
log.error("Exception caught while clean-closing standby task " + task.id(), e);
firstException.compareAndSet(null, e);
tasksToCloseDirty.add(task);
}
}
return tasksToCloseDirty;
}
Set<TaskId> activeTaskIds() {
@ -801,6 +871,10 @@ public class TaskManager { @@ -801,6 +871,10 @@ public class TaskManager {
return standbyTaskStream().collect(Collectors.toMap(Task::id, t -> t));
}
private List<Task> standbyTaskIterable() {
return standbyTaskStream().collect(Collectors.toList());
}
private Stream<Task> standbyTaskStream() {
return tasks.values().stream().filter(t -> !t.isActive());
}

8
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java

@ -359,7 +359,9 @@ public class RestoreIntegrationTest { @@ -359,7 +359,9 @@ public class RestoreIntegrationTest {
waitForStandbyCompletion(client1, 1, 30 * 1000L);
waitForStandbyCompletion(client2, 1, 30 * 1000L);
assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(0));
// Sometimes the store happens to have already been closed sometime during startup, so just keep track
// of where it started and make sure it doesn't happen more times from there
final int initialStoreCloseCount = CloseCountingInMemoryStore.numStoresClosed();
assertThat(restoreListener.totalNumRestored(), CoreMatchers.equalTo(0L));
client2.close();
@ -374,12 +376,12 @@ public class RestoreIntegrationTest { @@ -374,12 +376,12 @@ public class RestoreIntegrationTest {
// After stopping instance 2 and letting instance 1 take over its tasks, we should have closed just two stores
// total: the active and standby tasks on instance 2
assertThat(CloseCountingInMemoryStore.numStoresClosed(), equalTo(2));
assertThat(CloseCountingInMemoryStore.numStoresClosed(), equalTo(initialStoreCloseCount + 2));
client1.close();
waitForApplicationState(singletonList(client2), State.NOT_RUNNING, Duration.ofSeconds(60));
assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(4));
assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(initialStoreCloseCount + 4));
}
private static KeyValueBytesStoreSupplier getCloseCountingStore(final String name) {

352
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

@ -141,7 +141,7 @@ public class RecordCollectorTest { @@ -141,7 +141,7 @@ public class RecordCollectorTest {
@After
public void cleanup() {
collector.close();
collector.closeClean();
}
@Test
@ -299,7 +299,25 @@ public class RecordCollectorTest { @@ -299,7 +299,25 @@ public class RecordCollectorTest {
}
@Test
public void shouldAbortTxIfEosEnabled() {
public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
expect(streamsProducer.eosEnabled()).andReturn(true);
replay(streamsProducer);
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
streamsProducer,
productionExceptionHandler,
streamsMetrics);
collector.closeClean();
verify(streamsProducer);
}
@Test
public void shouldAbortTxOnCloseDirtyIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
expect(streamsProducer.eosEnabled()).andReturn(true);
streamsProducer.abortTransaction();
@ -312,7 +330,7 @@ public class RecordCollectorTest { @@ -312,7 +330,7 @@ public class RecordCollectorTest {
productionExceptionHandler,
streamsMetrics);
collector.close();
collector.closeDirty();
verify(streamsProducer);
}
@ -430,30 +448,12 @@ public class RecordCollectorTest { @@ -430,30 +448,12 @@ public class RecordCollectorTest {
}
@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedInCallback() {
public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenProducerFencedInCallback() {
final KafkaException exception = new ProducerFencedException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
new StreamsProducer(
eosConfig,
"threadId",
new MockClientSupplier() {
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
callback.onCompletion(null, exception);
return null;
}
};
}
},
taskId,
null,
logContext
),
getExceptionalStreamsProducer(exception),
productionExceptionHandler,
streamsMetrics
);
@ -461,72 +461,84 @@ public class RecordCollectorTest { @@ -461,72 +461,84 @@ public class RecordCollectorTest {
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
TaskMigratedException thrown = assertThrows(
final TaskMigratedException thrown = assertThrows(
TaskMigratedException.class, () ->
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
" indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
"\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
" indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
);
}
@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenProducerFencedInCallback() {
final KafkaException exception = new ProducerFencedException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducer(exception),
productionExceptionHandler,
streamsMetrics
);
collector.initialize();
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
thrown = assertThrows(TaskMigratedException.class, collector::flush);
final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, collector::flush);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
" indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
"\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
" indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
);
}
@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenProducerFencedInCallback() {
final KafkaException exception = new ProducerFencedException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducer(exception),
productionExceptionHandler,
streamsMetrics
);
collector.initialize();
thrown = assertThrows(TaskMigratedException.class, collector::close);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, collector::closeClean);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
" indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
"\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
" indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
);
}
@Test
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultExceptionHandler() {
public void shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler() {
final KafkaException exception = new KafkaException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
new StreamsProducer(
config,
"threadId",
new MockClientSupplier() {
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
callback.onCompletion(null, exception);
return null;
}
};
}
},
null,
null,
logContext
),
getExceptionalStreamsProducer(exception),
productionExceptionHandler,
streamsMetrics
);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
StreamsException thrown = assertThrows(
final StreamsException thrown = assertThrows(
StreamsException.class,
() -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
);
@ -537,23 +549,123 @@ public class RecordCollectorTest { @@ -537,23 +549,123 @@ public class RecordCollectorTest {
"\norg.apache.kafka.common.KafkaException: KABOOM!" +
"\nException handler choose to FAIL the processing, no more records would be sent.")
);
}
@Test
public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsWithDefaultExceptionHandler() {
final KafkaException exception = new KafkaException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducer(exception),
productionExceptionHandler,
streamsMetrics
);
thrown = assertThrows(StreamsException.class, collector::flush);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::flush);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.KafkaException: KABOOM!" +
"\nException handler choose to FAIL the processing, no more records would be sent.")
"\norg.apache.kafka.common.KafkaException: KABOOM!" +
"\nException handler choose to FAIL the processing, no more records would be sent.")
);
}
thrown = assertThrows(StreamsException.class, collector::close);
@Test
public void shouldThrowStreamsExceptionOnSubsequentCloseIfASendFailsWithDefaultExceptionHandler() {
final KafkaException exception = new KafkaException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducer(exception),
productionExceptionHandler,
streamsMetrics
);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.KafkaException: KABOOM!" +
"\nException handler choose to FAIL the processing, no more records would be sent.")
"\norg.apache.kafka.common.KafkaException: KABOOM!" +
"\nException handler choose to FAIL the processing, no more records would be sent.")
);
}
@Test
public void shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler() {
final KafkaException exception = new AuthenticationException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducer(exception),
new AlwaysContinueProductionExceptionHandler(),
streamsMetrics
);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
final StreamsException thrown = assertThrows(
StreamsException.class,
() -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
);
}
@Test
public void shouldThrowStreamsExceptionOnSubsequentFlushIfFatalEvenWithContinueExceptionHandler() {
final KafkaException exception = new AuthenticationException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducer(exception),
new AlwaysContinueProductionExceptionHandler(),
streamsMetrics
);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::flush);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
);
}
@Test
public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueExceptionHandler() {
final KafkaException exception = new AuthenticationException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
getExceptionalStreamsProducer(exception),
new AlwaysContinueProductionExceptionHandler(),
streamsMetrics
);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
);
}
@ -562,25 +674,7 @@ public class RecordCollectorTest { @@ -562,25 +674,7 @@ public class RecordCollectorTest {
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
new StreamsProducer(
config,
"threadId",
new MockClientSupplier() {
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
};
}
},
null,
null,
logContext
),
getExceptionalStreamsProducer(new Exception()),
new AlwaysContinueProductionExceptionHandler(),
streamsMetrics
);
@ -616,73 +710,11 @@ public class RecordCollectorTest { @@ -616,73 +710,11 @@ public class RecordCollectorTest {
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
collector.flush();
collector.close();
}
@Test
public void shouldThrowStreamsExceptionOnSubsequentCallIfFatalEvenWithContinueExceptionHandler() {
final KafkaException exception = new AuthenticationException("KABOOM!");
final RecordCollector collector = new RecordCollectorImpl(
logContext,
taskId,
new StreamsProducer(
config,
"threadId",
new MockClientSupplier() {
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
callback.onCompletion(null, exception);
return null;
}
};
}
},
null,
null,
logContext
),
new AlwaysContinueProductionExceptionHandler(),
streamsMetrics
);
collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
StreamsException thrown = assertThrows(
StreamsException.class,
() -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
);
thrown = assertThrows(StreamsException.class, collector::flush);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
);
thrown = assertThrows(StreamsException.class, collector::close);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
"\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
"\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
);
collector.closeClean();
}
@Test
public void shouldNotAbortTxnOnEOSCloseIfNothingSent() {
public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() {
final AtomicBoolean functionCalled = new AtomicBoolean(false);
final RecordCollector collector = new RecordCollectorImpl(
logContext,
@ -709,7 +741,7 @@ public class RecordCollectorTest { @@ -709,7 +741,7 @@ public class RecordCollectorTest {
streamsMetrics
);
collector.close();
collector.closeDirty();
assertFalse(functionCalled.get());
}
@ -774,7 +806,7 @@ public class RecordCollectorTest { @@ -774,7 +806,7 @@ public class RecordCollectorTest {
streamsMetrics
);
collector.close();
collector.closeClean();
// Flush should not throw as producer is still alive.
streamsProducer.flush();
@ -782,12 +814,34 @@ public class RecordCollectorTest { @@ -782,12 +814,34 @@ public class RecordCollectorTest {
@Test
public void shouldNotCloseInternalProducerForNonEOS() {
collector.close();
collector.closeClean();
// Flush should not throw as producer is still alive.
streamsProducer.flush();
}
private StreamsProducer getExceptionalStreamsProducer(final Exception exception) {
return new StreamsProducer(
config,
"threadId",
new MockClientSupplier() {
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
callback.onCompletion(null, exception);
return null;
}
};
}
},
null,
null,
logContext
);
}
private static class CustomStringSerializer extends StringSerializer {
private boolean isKey;

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

@ -1801,7 +1801,7 @@ public class StreamTaskTest { @@ -1801,7 +1801,7 @@ public class StreamTaskTest {
@Test
public void shouldOnlyRecycleSuspendedTasks() {
stateManager.recycle();
recordCollector.close();
recordCollector.closeClean();
EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig(false, "100"), true);

6
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -33,17 +33,14 @@ import org.apache.kafka.common.errors.TimeoutException; @@ -33,17 +33,14 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.easymock.EasyMock;
@ -166,13 +163,10 @@ public class TaskManagerTest { @@ -166,13 +163,10 @@ public class TaskManagerTest {
}
private void setUpTaskManager(final StreamThread.ProcessingMode processingMode) {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
taskManager = new TaskManager(
changeLogReader,
UUID.randomUUID(),
"taskManagerTest",
streamsMetrics,
activeTaskCreator,
standbyTaskCreator,
topologyBuilder,

5
streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java

@ -81,7 +81,10 @@ public class MockRecordCollector implements RecordCollector { @@ -81,7 +81,10 @@ public class MockRecordCollector implements RecordCollector {
}
@Override
public void close() {}
public void closeClean() {}
@Override
public void closeDirty() {}
@Override
public Map<TopicPartition, Long> offsets() {

Loading…
Cancel
Save