Browse Source

KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (#8058)

1. Removed task field from TaskMigrated; the only caller that encodes a task id from StreamTask actually do not throw so we only log it. To handle it on StreamThread we just always enforce rebalance (and we would call onPartitionsLost to remove all tasks as dirty).

2. Added TaskCorruptedException with a set of task-ids. The first scenario of this is the restoreConsumer.poll which throws InvalidOffset indicating that the logs are truncated / compacted. To handle it on StreamThread we first close the corresponding tasks as dirty (if EOS is enabled we would also wipe out the state stores), and then revive them into the CREATED state.

3. Also fixed a bug while investigating KAFKA-9572: when suspending / closing a restoring task we should not commit the new offsets but only updating the checkpoint file.

4. Re-enabled the unit test.
pull/8149/head
Guozhang Wang 5 years ago committed by GitHub
parent
commit
3b6573c150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      clients/src/main/java/org/apache/kafka/common/utils/FixedOrderMap.java
  2. 14
      clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java
  3. 46
      streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
  4. 50
      streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
  5. 21
      streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
  6. 26
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
  7. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
  8. 23
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
  9. 33
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
  10. 83
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  11. 37
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  12. 49
      streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
  13. 102
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
  14. 44
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
  15. 6
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
  16. 7
      streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

6
clients/src/main/java/org/apache/kafka/common/utils/FixedOrderMap.java

@ -50,12 +50,6 @@ public final class FixedOrderMap<K, V> extends LinkedHashMap<K, V> { @@ -50,12 +50,6 @@ public final class FixedOrderMap<K, V> extends LinkedHashMap<K, V> {
throw new UnsupportedOperationException("Removing from registeredStores is not allowed");
}
@Deprecated
@Override
public void clear() {
throw new UnsupportedOperationException("Removing from registeredStores is not allowed");
}
@Override
public FixedOrderMap<K, V> clone() {
throw new UnsupportedOperationException();

14
clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java

@ -69,18 +69,4 @@ public class FixedOrderMapTest { @@ -69,18 +69,4 @@ public class FixedOrderMapTest {
}
assertThat(map.get("a"), is(0));
}
@SuppressWarnings("deprecation")
@Test
public void shouldForbidConditionalClear() {
final FixedOrderMap<String, Integer> map = new FixedOrderMap<>();
map.put("a", 0);
try {
map.clear();
fail("expected exception");
} catch (final RuntimeException e) {
assertThat(e, CoreMatchers.instanceOf(UnsupportedOperationException.class));
}
assertThat(map.get("a"), is(0));
}
}

46
streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java

@ -0,0 +1,46 @@ @@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Map;
import java.util.Set;
/**
* Indicates a specific task is corrupted and need to be re-initialized. It can be thrown when
*
* 1) Under EOS, if the checkpoint file does not contain offsets for corresponding store's changelogs, meaning
* previously it was not close cleanly;
* 2) Out-of-range exception thrown during restoration, meaning that the changelog has been modified and we re-bootstrap
* the store.
*/
public class TaskCorruptedException extends StreamsException {
private final Map<TaskId, Set<TopicPartition>> taskWithChangelogs;
public TaskCorruptedException(final Map<TaskId, Set<TopicPartition>> taskWithChangelogs) {
super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs to be re-initialized");
this.taskWithChangelogs = taskWithChangelogs;
}
public Map<TaskId, Set<TopicPartition>> corruptedTaskWithChangelogs() {
return taskWithChangelogs;
}
}

50
streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java

@ -17,58 +17,16 @@ @@ -17,58 +17,16 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
/**
* Indicates that one or more tasks got migrated to another thread.
*
* 1) if the task field is specified, then that single task should be cleaned up and closed as "zombie" while the
* thread can continue as normal;
* 2) if no tasks are specified (i.e. taskId == null), it means that the hosted thread has been fenced and all
* tasks are migrated, in which case the thread should rejoin the group
* Indicates that all tasks belongs to the thread have migrated to another thread. This exception can be thrown when
* the thread gets fenced (either by the consumer coordinator or by the transaction coordinator), which means it is
* no longer part of the group but a "zombie" already
*/
public class TaskMigratedException extends StreamsException {
private final static long serialVersionUID = 1L;
private final TaskId taskId;
public TaskMigratedException(final TaskId taskId,
final TopicPartition topicPartition,
final long endOffset,
final long pos) {
this(taskId, String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d",
topicPartition,
endOffset,
pos), null);
}
public TaskMigratedException(final TaskId taskId) {
this(taskId, String.format("Task %s is unexpectedly closed during processing", taskId), null);
}
public TaskMigratedException(final TaskId taskId,
final Throwable throwable) {
this(taskId, String.format("Client request for task %s has been fenced due to a rebalance", taskId), throwable);
}
public TaskMigratedException(final TaskId taskId,
final String message,
final Throwable throwable) {
super(message, throwable);
this.taskId = taskId;
}
public TaskMigratedException(final String message, final Throwable throwable) {
this(null, message + " It means all tasks belonging to this thread have been migrated", throwable);
}
public TaskId migratedTaskId() {
return taskId;
}
public TaskMigratedException() {
this(null, "A task has been migrated unexpectedly", null);
super(message + "; It means all tasks belonging to this thread should be migrated", throwable);
}
}

21
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java

@ -20,8 +20,10 @@ import org.apache.kafka.common.TopicPartition; @@ -20,8 +20,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Collection;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
public abstract class AbstractTask implements Task {
@ -55,6 +57,16 @@ public abstract class AbstractTask implements Task { @@ -55,6 +57,16 @@ public abstract class AbstractTask implements Task {
return partitions;
}
@Override
public Collection<TopicPartition> changelogPartitions() {
return stateMgr.changelogPartitions();
}
@Override
public void markChangelogAsCorrupted(final Set<TopicPartition> partitions) {
stateMgr.markChangelogAsCorrupted(partitions);
}
@Override
public StateStore getStore(final String name) {
return stateMgr.getStore(name);
@ -70,6 +82,15 @@ public abstract class AbstractTask implements Task { @@ -70,6 +82,15 @@ public abstract class AbstractTask implements Task {
return state;
}
@Override
public void revive() {
if (state == CLOSED) {
transitionTo(CREATED);
} else {
throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id);
}
}
final void transitionTo(final Task.State newState) {
final State oldState = state();

26
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -37,6 +37,7 @@ import java.util.Collection; @@ -37,6 +37,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.String.format;
@ -84,11 +85,15 @@ public class ProcessorStateManager implements StateManager { @@ -84,11 +85,15 @@ public class ProcessorStateManager implements StateManager {
// update blindly with the given offset
private Long offset;
// corrupted state store should not be included in checkpointing
private boolean corrupted;
private StateStoreMetadata(final StateStore stateStore) {
this.stateStore = stateStore;
this.restoreCallback = null;
this.recordConverter = null;
this.changelogPartition = null;
this.corrupted = false;
this.offset = null;
}
@ -282,6 +287,20 @@ public class ProcessorStateManager implements StateManager { @@ -282,6 +287,20 @@ public class ProcessorStateManager implements StateManager {
return changelogOffsets().keySet();
}
void markChangelogAsCorrupted(final Set<TopicPartition> partitions) {
for (final StateStoreMetadata storeMetadata : stores.values()) {
if (partitions.contains(storeMetadata.changelogPartition)) {
storeMetadata.corrupted = true;
partitions.remove(storeMetadata.changelogPartition);
}
}
if (!partitions.isEmpty()) {
throw new IllegalStateException("Some partitions " + partitions + " are not contained in the store list of task " +
taskId + " marking as corrupted, this is not expected");
}
}
@Override
public Map<TopicPartition, Long> changelogOffsets() {
// return the current offsets for those logged stores
@ -415,6 +434,8 @@ public class ProcessorStateManager implements StateManager { @@ -415,6 +434,8 @@ public class ProcessorStateManager implements StateManager {
log.error("Failed to close state store {}: ", store.name(), exception);
}
}
stores.clear();
}
if (firstException != null) {
@ -439,10 +460,11 @@ public class ProcessorStateManager implements StateManager { @@ -439,10 +460,11 @@ public class ProcessorStateManager implements StateManager {
final Map<TopicPartition, Long> checkpointingOffsets = new HashMap<>();
for (final StateStoreMetadata storeMetadata : stores.values()) {
// store is logged, persistent, and has a valid current offset
// store is logged, persistent, not corrupted, and has a valid current offset
if (storeMetadata.changelogPartition != null &&
storeMetadata.stateStore.persistent() &&
storeMetadata.offset != null) {
storeMetadata.offset != null &&
!storeMetadata.corrupted) {
checkpointingOffsets.put(storeMetadata.changelogPartition, storeMetadata.offset);
}
}

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

@ -132,7 +132,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -132,7 +132,7 @@ public class RecordCollectorImpl implements RecordCollector {
try {
producer.beginTransaction();
} catch (final ProducerFencedException error) {
throw new TaskMigratedException(taskId, "Producer get fenced trying to begin a new transaction", error);
throw new TaskMigratedException("Producer get fenced trying to begin a new transaction", error);
} catch (final KafkaException error) {
throw new StreamsException("Producer encounter unexpected error trying to begin a new transaction", error);
}
@ -169,7 +169,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -169,7 +169,7 @@ public class RecordCollectorImpl implements RecordCollector {
producer.commitTransaction();
transactionInFlight = false;
} catch (final ProducerFencedException error) {
throw new TaskMigratedException(taskId, "Producer get fenced trying to commit a transaction", error);
throw new TaskMigratedException("Producer get fenced trying to commit a transaction", error);
} catch (final TimeoutException error) {
// TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level
throw new StreamsException("Timed out while committing transaction via producer for task " + taskId, error);
@ -181,7 +181,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -181,7 +181,7 @@ public class RecordCollectorImpl implements RecordCollector {
try {
consumer.commitSync(offsets);
} catch (final CommitFailedException error) {
throw new TaskMigratedException(taskId, "Consumer committing offsets failed, " +
throw new TaskMigratedException("Consumer committing offsets failed, " +
"indicating the corresponding thread is no longer part of the group.", error);
} catch (final TimeoutException error) {
// TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level
@ -216,7 +216,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -216,7 +216,7 @@ public class RecordCollectorImpl implements RecordCollector {
} else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " +
"indicating the task may be migrated out.";
sendException = new TaskMigratedException(taskId, errorMessage, exception);
sendException = new TaskMigratedException(errorMessage, exception);
} else {
if (exception instanceof RetriableException) {
errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
@ -314,7 +314,7 @@ public class RecordCollectorImpl implements RecordCollector { @@ -314,7 +314,7 @@ public class RecordCollectorImpl implements RecordCollector {
if (isRecoverable(uncaughtException)) {
// producer.send() call may throw a KafkaException which wraps a FencedException,
// in this case we should throw its wrapped inner cause so that it can be captured and re-wrapped as TaskMigrationException
throw new TaskMigratedException(taskId, "Producer cannot send records anymore since it got fenced", uncaughtException.getCause());
throw new TaskMigratedException("Producer cannot send records anymore since it got fenced", uncaughtException.getCause());
} else {
final String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, uncaughtException.toString());
throw new StreamsException(errorMessage, uncaughtException);

23
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@ -187,6 +186,16 @@ public class StandbyTask extends AbstractTask implements Task { @@ -187,6 +186,16 @@ public class StandbyTask extends AbstractTask implements Task {
transitionTo(State.CLOSED);
}
@Override
public boolean commitNeeded() {
return false;
}
@Override
public Map<TopicPartition, Long> changelogOffsets() {
return Collections.unmodifiableMap(stateMgr.changelogOffsets());
}
@Override
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition);
@ -223,16 +232,4 @@ public class StandbyTask extends AbstractTask implements Task { @@ -223,16 +232,4 @@ public class StandbyTask extends AbstractTask implements Task {
return sb.toString();
}
public boolean commitNeeded() {
return false;
}
public Collection<TopicPartition> changelogPartitions() {
return stateMgr.changelogPartitions();
}
public Map<TopicPartition, Long> changelogOffsets() {
return Collections.unmodifiableMap(stateMgr.changelogOffsets());
}
}

33
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.TimeoutException;
@ -27,7 +28,9 @@ import org.apache.kafka.common.utils.LogContext; @@ -27,7 +28,9 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;
@ -52,7 +55,6 @@ import java.util.stream.Collectors; @@ -52,7 +55,6 @@ import java.util.stream.Collectors;
* The reader also maintains the source of truth for restoration state: only active tasks restoring changelog could
* be completed, while standby tasks updating changelog would always be in restoring state after being initialized.
*/
// TODO K9113: we need to consider how to handle InvalidOffsetException for consumer#poll / position
public class StoreChangelogReader implements ChangelogReader {
enum ChangelogState {
@ -98,10 +100,6 @@ public class StoreChangelogReader implements ChangelogReader { @@ -98,10 +100,6 @@ public class StoreChangelogReader implements ChangelogReader {
private long totalRestored;
// only for active restoring tasks (for standby changelog it is null)
// NOTE we do not book keep the current offset since we leverage state manager as its source of truth
// the end offset beyond which records should not be applied (yet) to restore the states
//
// for both active restoring tasks and standby updating tasks, it is defined as:
@ -111,6 +109,8 @@ public class StoreChangelogReader implements ChangelogReader { @@ -111,6 +109,8 @@ public class StoreChangelogReader implements ChangelogReader {
// the log-end-offset only needs to be updated once and only need to be for active tasks since for standby
// tasks it would never "complete" based on the end-offset;
// the committed-offset needs to be updated periodically for those standby tasks
//
// NOTE we do not book keep the current offset since we leverage state manager as its source of truth
private Long restoreEndOffset;
// buffer records polled by the restore consumer;
@ -258,6 +258,8 @@ public class StoreChangelogReader implements ChangelogReader { @@ -258,6 +258,8 @@ public class StoreChangelogReader implements ChangelogReader {
// if we cannot get the position of the consumer within timeout, just return false
return false;
} catch (final KafkaException e) {
// this also includes InvalidOffsetException, which should not happen under normal
// execution, hence it is also okay to wrap it as fatal StreamsException
throw new StreamsException("Restore consumer get unexpected error trying to get the position " +
" of " + partition, e);
}
@ -411,6 +413,18 @@ public class StoreChangelogReader implements ChangelogReader { @@ -411,6 +413,18 @@ public class StoreChangelogReader implements ChangelogReader {
} catch (final FencedInstanceIdException e) {
// when the consumer gets fenced, all its tasks should be migrated
throw new TaskMigratedException("Restore consumer get fenced by instance-id polling records.", e);
} catch (final InvalidOffsetException e) {
log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
"the consumer's position has fallen out of the topic partition offset range because the topic was " +
"truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
"it later.", e.getClass().getName(), e.partitions());
final Map<TaskId, Set<TopicPartition>> taskWithCorruptedChangelogs = new HashMap<>();
for (final TopicPartition partition : e.partitions()) {
final TaskId taskId = changelogs.get(partition).stateManager.taskId();
taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition);
}
throw new TaskCorruptedException(taskWithCorruptedChangelogs);
} catch (final KafkaException e) {
throw new StreamsException("Restore consumer get unexpected error polling records.", e);
}
@ -428,7 +442,6 @@ public class StoreChangelogReader implements ChangelogReader { @@ -428,7 +442,6 @@ public class StoreChangelogReader implements ChangelogReader {
restoreChangelog(changelogs.get(partition));
}
maybeUpdateLimitOffsetsForStandbyChangelogs();
}
}
@ -681,6 +694,8 @@ public class StoreChangelogReader implements ChangelogReader { @@ -681,6 +694,8 @@ public class StoreChangelogReader implements ChangelogReader {
}
assignment.addAll(partitions);
restoreConsumer.assign(assignment);
log.debug("Added partitions {} to the restore consumer, current assignment is {}", partitions, assignment);
}
private void pauseChangelogsFromRestoreConsumer(final Collection<TopicPartition> partitions) {
@ -692,6 +707,8 @@ public class StoreChangelogReader implements ChangelogReader { @@ -692,6 +707,8 @@ public class StoreChangelogReader implements ChangelogReader {
"does not contain some of the partitions " + partitions + " for pausing.");
}
restoreConsumer.pause(partitions);
log.debug("Paused partitions {} from the restore consumer", partitions);
}
private void removeChangelogsFromRestoreConsumer(final Collection<TopicPartition> partitions) {
@ -715,6 +732,8 @@ public class StoreChangelogReader implements ChangelogReader { @@ -715,6 +732,8 @@ public class StoreChangelogReader implements ChangelogReader {
"does not contain some of the partitions " + partitions + " for resuming.");
}
restoreConsumer.resume(partitions);
log.debug("Resumed partitions {} from the restore consumer", partitions);
}
private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToRestore) {
@ -761,6 +780,8 @@ public class StoreChangelogReader implements ChangelogReader { @@ -761,6 +780,8 @@ public class StoreChangelogReader implements ChangelogReader {
} catch (final TimeoutException e) {
// if we cannot find the starting position at the beginning, just use the default 0L
} catch (final KafkaException e) {
// this also includes InvalidOffsetException, which should not happen under normal
// execution, hence it is also okay to wrap it as fatal StreamsException
throw new StreamsException("Restore consumer get unexpected error trying to get the position " +
" of " + partition, e);
}

83
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -49,7 +49,6 @@ import java.io.PrintWriter; @@ -49,7 +49,6 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -59,6 +58,7 @@ import java.util.function.Function; @@ -59,6 +58,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@ -71,10 +71,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -71,10 +71,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// visible for testing
static final byte LATEST_MAGIC_BYTE = 1;
private final Time time;
private final Logger log;
private final String logPrefix;
private final Time time;
private final String threadId;
private final Consumer<byte[], byte[]> consumer;
// we want to abstract eos logic out of StreamTask, however
@ -84,7 +83,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -84,7 +83,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final long maxTaskIdleMs;
private final int maxBufferedSize;
private final StreamsMetricsImpl streamsMetrics;
private final PartitionGroup partitionGroup;
private final RecordCollector recordCollector;
private final PartitionGroup.RecordInfo recordInfo;
@ -123,11 +121,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -123,11 +121,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
log = logContext.logger(getClass());
this.time = time;
this.streamsMetrics = streamsMetrics;
this.recordCollector = recordCollector;
eosDisabled = !StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
threadId = Thread.currentThread().getName();
final String threadId = Thread.currentThread().getName();
closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
final String taskId = id.toString();
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
@ -239,12 +236,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -239,12 +236,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
if (state() == State.CREATED || state() == State.CLOSING || state() == State.SUSPENDED) {
// do nothing
log.trace("Skip suspending since state is {}", state());
} else {
if (state() == State.RUNNING) {
} else if (state() == State.RUNNING) {
closeTopology(true);
}
if (state() == State.RUNNING || state() == State.RESTORING) {
commitState();
// whenever we have successfully committed state during suspension, it is safe to checkpoint
// the state as well no matter if EOS is enabled or not
@ -254,12 +248,22 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -254,12 +248,22 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
partitionGroup.clear();
transitionTo(State.SUSPENDED);
log.info("Suspended active");
log.info("Suspended running");
} else if (state() == State.RESTORING) {
// we just checkpoint the position that we've restored up to without
// going through the commit process
stateMgr.flush();
stateMgr.checkpoint(emptyMap());
// we should also clear any buffered records of a task when suspending it
partitionGroup.clear();
transitionTo(State.SUSPENDED);
log.info("Suspended restoring");
} else {
throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id);
}
}
}
/**
* <pre>
@ -413,26 +417,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -413,26 +417,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
} else {
if (state() == State.RUNNING) {
closeTopology(clean);
}
if (state() == State.RUNNING || state() == State.RESTORING) {
if (clean) {
commitState();
// whenever we have successfully committed state, it is safe to checkpoint
// the state as well no matter if EOS is enabled or not
stateMgr.checkpoint(checkpointableOffsets());
} else if (eosDisabled) {
// if from unclean close, then only need to flush state to make sure that when later
// closing the states, there's no records triggering any processing anymore; also swallow all caught exceptions
// However, for a _clean_ shutdown, we try to commit and checkpoint. If there are any exceptions, they become
// fatal for the "closeClean()" call, and the caller can try again with closeDirty() to complete the shutdown.
try {
stateMgr.flush();
} catch (final RuntimeException error) {
log.debug("Ignoring flush error in unclean close.", error);
}
} else {
executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush");
}
transitionTo(State.CLOSING);
} else if (state() == State.RESTORING) {
executeAndMaybeSwallow(clean, () -> {
stateMgr.flush();
stateMgr.checkpoint(Collections.emptyMap());
}, "state manager flush and checkpoint");
transitionTo(State.CLOSING);
} else if (state() == State.SUSPENDED) {
// do not need to commit / checkpoint, since when suspending we've already committed the state
@ -444,12 +445,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -444,12 +445,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory);
// if EOS is enabled, we wipe out the whole state store since they are invalid to use anymore
if (!eosDisabled) {
// if EOS is enabled, we wipe out the whole state store for unclean close
// since they are invalid to use anymore
if (!clean && !eosDisabled) {
StateManagerUtil.wipeStateStores(log, stateMgr);
}
closeRecordCollector(clean);
executeAndMaybeSwallow(clean, recordCollector::close, "record collector close");
} else {
throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id);
}
@ -457,7 +459,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -457,7 +459,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
partitionGroup.close();
closeTaskSensor.record();
streamsMetrics.removeAllTaskLevelSensors(threadId, id.toString());
transitionTo(State.CLOSED);
}
@ -467,6 +468,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -467,6 +468,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
* source topic partitions, or if it is enforced to be processable
*/
public boolean isProcessable(final long wallClockTime) {
if (state() == State.CLOSED || state() == State.CLOSING) {
// a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
// in either case we can just log it and move on without notifying the thread since the consumer
// would soon be updated to not return any records for this task anymore.
log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
return false;
}
if (partitionGroup.allPartitionsBuffered()) {
idleStartTime = RecordQueue.UNKNOWN;
return true;
@ -700,12 +710,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -700,12 +710,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
private void closeRecordCollector(final boolean clean) {
private void executeAndMaybeSwallow(final boolean clean, final Runnable runnable, final String name) {
try {
recordCollector.close();
runnable.run();
} catch (final RuntimeException e) {
if (clean) {
throw e;
} else {
log.debug("Ignoring error in unclean {}", name);
}
}
}
@ -719,12 +731,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -719,12 +731,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
*/
@Override
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
if (state() == State.CLOSED || state() == State.CLOSING) {
log.info("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", id());
throw new TaskMigratedException(id());
}
final int newQueueSize = partitionGroup.addRawRecords(partition, records);
if (log.isTraceEnabled()) {
@ -918,11 +924,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, @@ -918,11 +924,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
return commitNeeded;
}
@Override
public Collection<TopicPartition> changelogPartitions() {
return stateMgr.changelogPartitions();
}
@Override
public Map<TopicPartition, Long> changelogOffsets() {
if (state() == State.RUNNING) {

37
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Time; @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
@ -262,7 +263,6 @@ public class StreamThread extends Thread { @@ -262,7 +263,6 @@ public class StreamThread extends Thread {
final Time time;
final Logger log;
AbstractTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
final StreamsMetricsImpl streamsMetrics,
@ -574,6 +574,7 @@ public class StreamThread extends Thread { @@ -574,6 +574,7 @@ public class StreamThread extends Thread {
changelogReader,
processId,
logPrefix,
streamsMetrics,
activeTaskCreator,
standbyTaskCreator,
builder,
@ -720,14 +721,11 @@ public class StreamThread extends Thread { @@ -720,14 +721,11 @@ public class StreamThread extends Thread {
try {
runLoop();
cleanRun = true;
} catch (final KafkaException e) {
log.error("Encountered the following unexpected Kafka exception during processing, " +
"this usually indicate Streams internal errors:", e);
throw e;
} catch (final Exception e) {
// we have caught all Kafka related exceptions, and other runtime exceptions
// should be due to user application errors
log.error("Encountered the following error during processing:", e);
log.error("Encountered the following exception during processing " +
"and the thread is going to shut down: ", e);
throw e;
} finally {
completeShutdown(cleanRun);
@ -747,15 +745,22 @@ public class StreamThread extends Thread { @@ -747,15 +745,22 @@ public class StreamThread extends Thread {
try {
runOnce();
if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
log.info("Version probing detected. Triggering new rebalance.");
log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance.");
assignmentErrorCode.set(AssignorError.NONE.code());
enforceRebalance();
}
} catch (final TaskMigratedException ignoreAndRejoinGroup) {
log.warn("Detected task {} that got migrated to another thread. " +
} catch (final TaskCorruptedException e) {
log.warn("Detected the states of tasks {} are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e.corruptedTaskWithChangelogs());
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
} catch (final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will try to rejoin the consumer group.", ignoreAndRejoinGroup.migratedTaskId());
"Will close out all assigned tasks and rejoin the consumer group.");
taskManager.handleLostAll();
enforceRebalance();
}
}
@ -849,7 +854,7 @@ public class StreamThread extends Thread { @@ -849,7 +854,7 @@ public class StreamThread extends Thread {
* 5. If one of the above happens, half the value of N.
*/
int processed = 0;
long timeSinceLastPoll = 0L;
long timeSinceLastPoll;
do {
for (int i = 0; i < numIterations; i++) {
@ -946,7 +951,7 @@ public class StreamThread extends Thread { @@ -946,7 +951,7 @@ public class StreamThread extends Thread {
if (originalReset.equals("earliest")) {
addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
} else if (originalReset.equals("latest")) {
} else {
addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
}
}
@ -974,16 +979,12 @@ public class StreamThread extends Thread { @@ -974,16 +979,12 @@ public class StreamThread extends Thread {
* @param records Records, can be null
*/
private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
for (final TopicPartition partition : records.partitions()) {
final Task task = taskManager.taskForInputPartition(partition);
if (task == null) {
log.error(
"Unable to locate active task for received-record partition {}. Current tasks: {}",
partition,
taskManager.toString(">")
);
log.error("Unable to locate active task for received-record partition {}. Current tasks: {}",
partition, taskManager.toString(">"));
throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
}

49
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java

@ -39,30 +39,30 @@ public interface Task { @@ -39,30 +39,30 @@ public interface Task {
*
* <pre>
* +-------------+
* +<---- | Created (0) |
* | +-----+-------+
* | |
* | v
* | +-----+-------+
* +<---- | Restoring(1)|<---------------+
* +<---- | Created (0) | <----------------------+
* | +-----+-------+ |
* | | |
* | +--------------------+ |
* | | | |
* | v v |
* | +-----+-------+ +----+---+----+
* | | Running (2) | ----> | Suspended(3)| * //TODO Suspended(3) could be removed after we've stable on KIP-429
* | +-----+-------+ +------+------+
* | | |
* | | |
* | v |
* | +-----+-------+ |
* +----> | Closing (4) | <------------+
* +-----+-------+
* |
* v
* +-----+-------+
* | Closed (5) |
* +<---- | Restoring(1)|<---------------+ |
* | +-----+-------+ | |
* | | | |
* | +--------------------+ | |
* | | | | |
* | v v | |
* | +-----+-------+ +----+---+----+ |
* | | Running (2) | ----> | Suspended(3)| | * //TODO Suspended(3) could be removed after we've stable on KIP-429
* | +-----+-------+ +------+------+ |
* | | | |
* | | | |
* | v | |
* | +-----+-------+ | |
* +----> | Closing (4) | <------------+ |
* +-----+-------+ |
* | |
* v |
* +-----+-------+ |
* | Closed (5) | -----------------------+
* +-------------+
* </pre>
*/
@ -72,7 +72,7 @@ public interface Task { @@ -72,7 +72,7 @@ public interface Task {
RUNNING(3, 4), // 2
SUSPENDED(1, 4), // 3
CLOSING(4, 5), // 4, we allow CLOSING to transit to itself to make close idempotent
CLOSED; // 5
CLOSED(0); // 5, we allow CLOSED to transit to CREATED to handle corrupted tasks
private final Set<Integer> validTransitions = new HashSet<>();
@ -154,6 +154,11 @@ public interface Task { @@ -154,6 +154,11 @@ public interface Task {
*/
void closeDirty();
/**
* Revive a closed task to a created one; should never throw an exception
*/
void revive();
StateStore getStore(final String name);
Set<TopicPartition> inputPartitions();
@ -169,6 +174,8 @@ public interface Task { @@ -169,6 +174,8 @@ public interface Task {
*/
Map<TopicPartition, Long> changelogOffsets();
void markChangelogAsCorrupted(final Set<TopicPartition> partitions);
default Map<TopicPartition, Long> purgableOffsets() {
return Collections.emptyMap();
}

102
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.StreamsException; @@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import java.io.File;
@ -56,40 +57,42 @@ public class TaskManager { @@ -56,40 +57,42 @@ public class TaskManager {
// by QueryableState
private final Logger log;
private final UUID processId;
private final ChangelogReader changelogReader;
private final String logPrefix;
private final StreamThread.AbstractTaskCreator<? extends Task> taskCreator;
private final InternalTopologyBuilder builder;
private final ChangelogReader changelogReader;
private final StreamsMetricsImpl streamsMetrics;
private final StreamThread.AbstractTaskCreator<? extends Task> activeTaskCreator;
private final StreamThread.AbstractTaskCreator<? extends Task> standbyTaskCreator;
private final Admin adminClient;
private DeleteRecordsResult deleteRecordsResult;
private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit
private final Map<TaskId, Task> tasks = new TreeMap<>();
// materializing this relationship because the lookup is on the hot path
private final Map<TopicPartition, Task> partitionToTask = new HashMap<>();
private final Admin adminClient;
private Consumer<byte[], byte[]> consumer;
private final InternalTopologyBuilder builder;
private DeleteRecordsResult deleteRecordsResult;
private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit
TaskManager(final ChangelogReader changelogReader,
final UUID processId,
final String logPrefix,
final StreamThread.AbstractTaskCreator<? extends Task> taskCreator,
final StreamsMetricsImpl streamsMetrics,
final StreamThread.AbstractTaskCreator<? extends Task> activeTaskCreator,
final StreamThread.AbstractTaskCreator<? extends Task> standbyTaskCreator,
final InternalTopologyBuilder builder,
final Admin adminClient) {
this.changelogReader = changelogReader;
this.builder = builder;
this.processId = processId;
this.logPrefix = logPrefix;
this.taskCreator = taskCreator;
this.adminClient = adminClient;
this.streamsMetrics = streamsMetrics;
this.changelogReader = changelogReader;
this.activeTaskCreator = activeTaskCreator;
this.standbyTaskCreator = standbyTaskCreator;
this.builder = builder;
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
this.adminClient = adminClient;
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
}
void setConsumer(final Consumer<byte[], byte[]> consumer) {
@ -118,6 +121,29 @@ public class TaskManager { @@ -118,6 +121,29 @@ public class TaskManager {
rebalanceInProgress = false;
}
void handleCorruption(final Map<TaskId, Set<TopicPartition>> taskWithChangelogs) {
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : taskWithChangelogs.entrySet()) {
final TaskId taskId = entry.getKey();
final Task task = tasks.get(taskId);
// this call is idempotent so even if the task is only CREATED we can still call it
changelogReader.remove(task.changelogPartitions());
// mark corrupted partitions to not be checkpointed, and then close the task as dirty
final Set<TopicPartition> corruptedPartitions = entry.getValue();
task.markChangelogAsCorrupted(corruptedPartitions);
try {
task.closeClean();
} catch (final RuntimeException e) {
log.error("Failed to close task {} cleanly while handling corrupted tasks. Attempting to re-close it as dirty.", task.id());
task.closeDirty();
}
task.revive();
}
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
* @throws StreamsException fatal error while creating / initializing the task
@ -148,9 +174,9 @@ public class TaskManager { @@ -148,9 +174,9 @@ public class TaskManager {
task.resume();
standbyTasksToCreate.remove(task.id());
} else /* we previously owned this task, and we don't have it anymore, or it has changed active/standby state */ {
final Set<TopicPartition> inputPartitions = task.inputPartitions();
cleanupTask(task);
try {
changelogReader.remove(task.changelogPartitions());
task.closeClean();
} catch (final RuntimeException e) {
log.error(String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id()), e);
@ -159,9 +185,7 @@ public class TaskManager { @@ -159,9 +185,7 @@ public class TaskManager {
// Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
task.closeDirty();
}
for (final TopicPartition inputPartition : inputPartitions) {
partitionToTask.remove(inputPartition);
}
iterator.remove();
}
}
@ -176,7 +200,7 @@ public class TaskManager { @@ -176,7 +200,7 @@ public class TaskManager {
}
if (!activeTasksToCreate.isEmpty()) {
taskCreator.createTasks(consumer, activeTasksToCreate).forEach(this::addNewTask);
activeTaskCreator.createTasks(consumer, activeTasksToCreate).forEach(this::addNewTask);
}
if (!standbyTasksToCreate.isEmpty()) {
@ -293,20 +317,15 @@ public class TaskManager { @@ -293,20 +317,15 @@ public class TaskManager {
final Iterator<Task> iterator = tasks.values().iterator();
while (iterator.hasNext()) {
final Task task = iterator.next();
final Set<TopicPartition> inputPartitions = task.inputPartitions();
// Even though we've apparently dropped out of the group, we can continue safely to maintain our
// standby tasks while we rejoin.
if (task.isActive()) {
cleanupTask(task);
task.closeDirty();
changelogReader.remove(task.changelogPartitions());
}
for (final TopicPartition inputPartition : inputPartitions) {
partitionToTask.remove(inputPartition);
}
iterator.remove();
}
}
}
/**
* Returns ids of tasks whose states are kept on the local storage. This includes active, standby, and previously
@ -320,7 +339,7 @@ public class TaskManager { @@ -320,7 +339,7 @@ public class TaskManager {
final Set<TaskId> locallyStoredTasks = new HashSet<>();
final File[] stateDirs = taskCreator.stateDirectory().listTaskDirectories();
final File[] stateDirs = activeTaskCreator.stateDirectory().listTaskDirectories();
if (stateDirs != null) {
for (final File dir : stateDirs) {
try {
@ -339,12 +358,27 @@ public class TaskManager { @@ -339,12 +358,27 @@ public class TaskManager {
return locallyStoredTasks;
}
private void cleanupTask(final Task task) {
// 1. remove the changelog partitions from changelog reader;
// 2. remove the input partitions from the materialized map;
// 3. remove the task metrics from the metrics registry
changelogReader.remove(task.changelogPartitions());
for (final TopicPartition inputPartition : task.inputPartitions()) {
partitionToTask.remove(inputPartition);
}
final String threadId = Thread.currentThread().getName();
streamsMetrics.removeAllTaskLevelSensors(threadId, task.id().toString());
}
void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
final Iterator<Task> iterator = tasks.values().iterator();
while (iterator.hasNext()) {
final Task task = iterator.next();
final Set<TopicPartition> inputPartitions = task.inputPartitions();
cleanupTask(task);
if (clean) {
try {
task.closeClean();
@ -358,16 +392,10 @@ public class TaskManager { @@ -358,16 +392,10 @@ public class TaskManager {
} else {
task.closeDirty();
}
changelogReader.remove(task.changelogPartitions());
for (final TopicPartition inputPartition : inputPartitions) {
partitionToTask.remove(inputPartition);
}
iterator.remove();
}
taskCreator.close();
activeTaskCreator.close();
final RuntimeException fatalException = firstException.get();
if (fatalException != null) {

44
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

@ -1321,6 +1321,46 @@ public class StreamTaskTest { @@ -1321,6 +1321,46 @@ public class StreamTaskTest {
verify(stateManager);
}
@Test
public void shouldNotCommitOnSuspendRestoring() {
stateManager.flush();
EasyMock.expectLastCall();
stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
EasyMock.expectLastCall();
recordCollector.commit(EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new AssertionError("Should not call this function")).anyTimes();
EasyMock.replay(stateManager);
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
task.initializeIfNeeded();
task.suspend();
assertEquals(Task.State.SUSPENDED, task.state());
verify(stateManager);
}
@Test
public void shouldNotCommitOnCloseRestoring() {
stateManager.flush();
EasyMock.expectLastCall();
stateManager.checkpoint(EasyMock.eq(Collections.emptyMap()));
EasyMock.expectLastCall();
recordCollector.commit(EasyMock.anyObject());
EasyMock.expectLastCall().andThrow(new AssertionError("Should not call this function")).anyTimes();
EasyMock.replay(stateManager);
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
task.initializeIfNeeded();
task.closeClean();
assertEquals(Task.State.CLOSED, task.state());
verify(stateManager);
}
@Test
public void shouldCommitOnCloseClean() {
final long offset = 543L;
@ -1339,6 +1379,7 @@ public class StreamTaskTest { @@ -1339,6 +1379,7 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
task.closeClean();
assertEquals(Task.State.CLOSED, task.state());
@ -1365,6 +1406,7 @@ public class StreamTaskTest { @@ -1365,6 +1406,7 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
assertThrows(ProcessorStateException.class, task::closeClean);
@ -1418,7 +1460,7 @@ public class StreamTaskTest { @@ -1418,7 +1460,7 @@ public class StreamTaskTest {
EasyMock.expectLastCall();
stateManager.flush();
EasyMock.expectLastCall();
stateManager.checkpoint(Collections.singletonMap(changelogPartition, offset));
stateManager.checkpoint(Collections.emptyMap());
EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
stateManager.close();
EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes();

6
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -73,7 +73,6 @@ import org.apache.kafka.test.TestUtils; @@ -73,7 +73,6 @@ import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
@ -1364,12 +1363,11 @@ public class StreamThreadTest { @@ -1364,12 +1363,11 @@ public class StreamThreadTest {
assertTrue(metadata.standbyTasks().isEmpty());
}
@Ignore
@Test
// FIXME: should unblock this test after we added invalid offset handling
public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
.groupByKey().count(Materialized.as("count"));
.groupByKey()
.count(Materialized.as("count"));
internalStreamsBuilder.buildAndOptimizeTopology();
final StreamThread thread = createStreamThread("clientId", config, false);

7
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

@ -25,9 +25,12 @@ import org.apache.kafka.clients.consumer.Consumer; @@ -25,9 +25,12 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
@ -115,9 +118,11 @@ public class TaskManagerTest { @@ -115,9 +118,11 @@ public class TaskManagerTest {
@Before
public void setUp() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
taskManager = new TaskManager(changeLogReader,
UUID.randomUUID(),
"",
"taskManagerTest",
streamsMetrics,
activeTaskCreator,
standbyTaskCreator,
topologyBuilder,

Loading…
Cancel
Save