diff --git a/clients/src/main/java/org/apache/kafka/common/utils/FixedOrderMap.java b/clients/src/main/java/org/apache/kafka/common/utils/FixedOrderMap.java index 65188788770..175282e7e09 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/FixedOrderMap.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/FixedOrderMap.java @@ -50,12 +50,6 @@ public final class FixedOrderMap extends LinkedHashMap { throw new UnsupportedOperationException("Removing from registeredStores is not allowed"); } - @Deprecated - @Override - public void clear() { - throw new UnsupportedOperationException("Removing from registeredStores is not allowed"); - } - @Override public FixedOrderMap clone() { throw new UnsupportedOperationException(); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java b/clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java index 7d3f3f75256..d4b41519d79 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java @@ -69,18 +69,4 @@ public class FixedOrderMapTest { } assertThat(map.get("a"), is(0)); } - - @SuppressWarnings("deprecation") - @Test - public void shouldForbidConditionalClear() { - final FixedOrderMap map = new FixedOrderMap<>(); - map.put("a", 0); - try { - map.clear(); - fail("expected exception"); - } catch (final RuntimeException e) { - assertThat(e, CoreMatchers.instanceOf(UnsupportedOperationException.class)); - } - assertThat(map.get("a"), is(0)); - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java new file mode 100644 index 00000000000..f1a3b48320b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +import java.util.Map; +import java.util.Set; + +/** + * Indicates a specific task is corrupted and need to be re-initialized. It can be thrown when + * + * 1) Under EOS, if the checkpoint file does not contain offsets for corresponding store's changelogs, meaning + * previously it was not close cleanly; + * 2) Out-of-range exception thrown during restoration, meaning that the changelog has been modified and we re-bootstrap + * the store. + */ +public class TaskCorruptedException extends StreamsException { + + private final Map> taskWithChangelogs; + + public TaskCorruptedException(final Map> taskWithChangelogs) { + super("Tasks with changelogs " + taskWithChangelogs + " are corrupted and hence needs to be re-initialized"); + + this.taskWithChangelogs = taskWithChangelogs; + } + + public Map> corruptedTaskWithChangelogs() { + return taskWithChangelogs; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java index a3307f55a2a..4a758b40473 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java @@ -17,58 +17,16 @@ package org.apache.kafka.streams.errors; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.processor.TaskId; - /** - * Indicates that one or more tasks got migrated to another thread. - * - * 1) if the task field is specified, then that single task should be cleaned up and closed as "zombie" while the - * thread can continue as normal; - * 2) if no tasks are specified (i.e. taskId == null), it means that the hosted thread has been fenced and all - * tasks are migrated, in which case the thread should rejoin the group + * Indicates that all tasks belongs to the thread have migrated to another thread. This exception can be thrown when + * the thread gets fenced (either by the consumer coordinator or by the transaction coordinator), which means it is + * no longer part of the group but a "zombie" already */ public class TaskMigratedException extends StreamsException { private final static long serialVersionUID = 1L; - private final TaskId taskId; - - public TaskMigratedException(final TaskId taskId, - final TopicPartition topicPartition, - final long endOffset, - final long pos) { - this(taskId, String.format("Log end offset of %s should not change while restoring: old end offset %d, current offset %d", - topicPartition, - endOffset, - pos), null); - } - - public TaskMigratedException(final TaskId taskId) { - this(taskId, String.format("Task %s is unexpectedly closed during processing", taskId), null); - } - - public TaskMigratedException(final TaskId taskId, - final Throwable throwable) { - this(taskId, String.format("Client request for task %s has been fenced due to a rebalance", taskId), throwable); - } - - public TaskMigratedException(final TaskId taskId, - final String message, - final Throwable throwable) { - super(message, throwable); - this.taskId = taskId; - } - public TaskMigratedException(final String message, final Throwable throwable) { - this(null, message + " It means all tasks belonging to this thread have been migrated", throwable); - } - - public TaskId migratedTaskId() { - return taskId; - } - - public TaskMigratedException() { - this(null, "A task has been migrated unexpectedly", null); + super(message + "; It means all tasks belonging to this thread should be migrated", throwable); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index ca687316d5e..ee04b25ab84 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -20,8 +20,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import java.util.Collection; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED; import static org.apache.kafka.streams.processor.internals.Task.State.CREATED; public abstract class AbstractTask implements Task { @@ -55,6 +57,16 @@ public abstract class AbstractTask implements Task { return partitions; } + @Override + public Collection changelogPartitions() { + return stateMgr.changelogPartitions(); + } + + @Override + public void markChangelogAsCorrupted(final Set partitions) { + stateMgr.markChangelogAsCorrupted(partitions); + } + @Override public StateStore getStore(final String name) { return stateMgr.getStore(name); @@ -70,6 +82,15 @@ public abstract class AbstractTask implements Task { return state; } + @Override + public void revive() { + if (state == CLOSED) { + transitionTo(CREATED); + } else { + throw new IllegalStateException("Illegal state " + state() + " while reviving task " + id); + } + } + final void transitionTo(final Task.State newState) { final State oldState = state(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index d2cb0917594..9a58d5fd7b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static java.lang.String.format; @@ -84,11 +85,15 @@ public class ProcessorStateManager implements StateManager { // update blindly with the given offset private Long offset; + // corrupted state store should not be included in checkpointing + private boolean corrupted; + private StateStoreMetadata(final StateStore stateStore) { this.stateStore = stateStore; this.restoreCallback = null; this.recordConverter = null; this.changelogPartition = null; + this.corrupted = false; this.offset = null; } @@ -282,6 +287,20 @@ public class ProcessorStateManager implements StateManager { return changelogOffsets().keySet(); } + void markChangelogAsCorrupted(final Set partitions) { + for (final StateStoreMetadata storeMetadata : stores.values()) { + if (partitions.contains(storeMetadata.changelogPartition)) { + storeMetadata.corrupted = true; + partitions.remove(storeMetadata.changelogPartition); + } + } + + if (!partitions.isEmpty()) { + throw new IllegalStateException("Some partitions " + partitions + " are not contained in the store list of task " + + taskId + " marking as corrupted, this is not expected"); + } + } + @Override public Map changelogOffsets() { // return the current offsets for those logged stores @@ -415,6 +434,8 @@ public class ProcessorStateManager implements StateManager { log.error("Failed to close state store {}: ", store.name(), exception); } } + + stores.clear(); } if (firstException != null) { @@ -439,10 +460,11 @@ public class ProcessorStateManager implements StateManager { final Map checkpointingOffsets = new HashMap<>(); for (final StateStoreMetadata storeMetadata : stores.values()) { - // store is logged, persistent, and has a valid current offset + // store is logged, persistent, not corrupted, and has a valid current offset if (storeMetadata.changelogPartition != null && storeMetadata.stateStore.persistent() && - storeMetadata.offset != null) { + storeMetadata.offset != null && + !storeMetadata.corrupted) { checkpointingOffsets.put(storeMetadata.changelogPartition, storeMetadata.offset); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index e84197becc0..950412dfc1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -132,7 +132,7 @@ public class RecordCollectorImpl implements RecordCollector { try { producer.beginTransaction(); } catch (final ProducerFencedException error) { - throw new TaskMigratedException(taskId, "Producer get fenced trying to begin a new transaction", error); + throw new TaskMigratedException("Producer get fenced trying to begin a new transaction", error); } catch (final KafkaException error) { throw new StreamsException("Producer encounter unexpected error trying to begin a new transaction", error); } @@ -169,7 +169,7 @@ public class RecordCollectorImpl implements RecordCollector { producer.commitTransaction(); transactionInFlight = false; } catch (final ProducerFencedException error) { - throw new TaskMigratedException(taskId, "Producer get fenced trying to commit a transaction", error); + throw new TaskMigratedException("Producer get fenced trying to commit a transaction", error); } catch (final TimeoutException error) { // TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level throw new StreamsException("Timed out while committing transaction via producer for task " + taskId, error); @@ -181,7 +181,7 @@ public class RecordCollectorImpl implements RecordCollector { try { consumer.commitSync(offsets); } catch (final CommitFailedException error) { - throw new TaskMigratedException(taskId, "Consumer committing offsets failed, " + + throw new TaskMigratedException("Consumer committing offsets failed, " + "indicating the corresponding thread is no longer part of the group.", error); } catch (final TimeoutException error) { // TODO KIP-447: we can consider treating it as non-fatal and retry on the thread level @@ -216,7 +216,7 @@ public class RecordCollectorImpl implements RecordCollector { } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) { errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " + "indicating the task may be migrated out."; - sendException = new TaskMigratedException(taskId, errorMessage, exception); + sendException = new TaskMigratedException(errorMessage, exception); } else { if (exception instanceof RetriableException) { errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " + @@ -314,7 +314,7 @@ public class RecordCollectorImpl implements RecordCollector { if (isRecoverable(uncaughtException)) { // producer.send() call may throw a KafkaException which wraps a FencedException, // in this case we should throw its wrapped inner cause so that it can be captured and re-wrapped as TaskMigrationException - throw new TaskMigratedException(taskId, "Producer cannot send records anymore since it got fenced", uncaughtException.getCause()); + throw new TaskMigratedException("Producer cannot send records anymore since it got fenced", uncaughtException.getCause()); } else { final String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic, taskId, uncaughtException.toString()); throw new StreamsException(errorMessage, uncaughtException); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 70cbed66ae0..6349d640b23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.slf4j.Logger; -import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -187,6 +186,16 @@ public class StandbyTask extends AbstractTask implements Task { transitionTo(State.CLOSED); } + @Override + public boolean commitNeeded() { + return false; + } + + @Override + public Map changelogOffsets() { + return Collections.unmodifiableMap(stateMgr.changelogOffsets()); + } + @Override public void addRecords(final TopicPartition partition, final Iterable> records) { throw new IllegalStateException("Attempted to add records to task " + id() + " for invalid input partition " + partition); @@ -223,16 +232,4 @@ public class StandbyTask extends AbstractTask implements Task { return sb.toString(); } - - public boolean commitNeeded() { - return false; - } - - public Collection changelogPartitions() { - return stateMgr.changelogPartitions(); - } - - public Map changelogOffsets() { - return Collections.unmodifiableMap(stateMgr.changelogOffsets()); - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 0eea8720bb9..e2e8ddb182a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.TimeoutException; @@ -27,7 +28,9 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata; import org.apache.kafka.streams.processor.StateRestoreListener; import org.slf4j.Logger; @@ -52,7 +55,6 @@ import java.util.stream.Collectors; * The reader also maintains the source of truth for restoration state: only active tasks restoring changelog could * be completed, while standby tasks updating changelog would always be in restoring state after being initialized. */ -// TODO K9113: we need to consider how to handle InvalidOffsetException for consumer#poll / position public class StoreChangelogReader implements ChangelogReader { enum ChangelogState { @@ -98,10 +100,6 @@ public class StoreChangelogReader implements ChangelogReader { private long totalRestored; - // only for active restoring tasks (for standby changelog it is null) - // NOTE we do not book keep the current offset since we leverage state manager as its source of truth - - // the end offset beyond which records should not be applied (yet) to restore the states // // for both active restoring tasks and standby updating tasks, it is defined as: @@ -111,6 +109,8 @@ public class StoreChangelogReader implements ChangelogReader { // the log-end-offset only needs to be updated once and only need to be for active tasks since for standby // tasks it would never "complete" based on the end-offset; // the committed-offset needs to be updated periodically for those standby tasks + // + // NOTE we do not book keep the current offset since we leverage state manager as its source of truth private Long restoreEndOffset; // buffer records polled by the restore consumer; @@ -258,6 +258,8 @@ public class StoreChangelogReader implements ChangelogReader { // if we cannot get the position of the consumer within timeout, just return false return false; } catch (final KafkaException e) { + // this also includes InvalidOffsetException, which should not happen under normal + // execution, hence it is also okay to wrap it as fatal StreamsException throw new StreamsException("Restore consumer get unexpected error trying to get the position " + " of " + partition, e); } @@ -411,6 +413,18 @@ public class StoreChangelogReader implements ChangelogReader { } catch (final FencedInstanceIdException e) { // when the consumer gets fenced, all its tasks should be migrated throw new TaskMigratedException("Restore consumer get fenced by instance-id polling records.", e); + } catch (final InvalidOffsetException e) { + log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " + + "the consumer's position has fallen out of the topic partition offset range because the topic was " + + "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + + "it later.", e.getClass().getName(), e.partitions()); + + final Map> taskWithCorruptedChangelogs = new HashMap<>(); + for (final TopicPartition partition : e.partitions()) { + final TaskId taskId = changelogs.get(partition).stateManager.taskId(); + taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition); + } + throw new TaskCorruptedException(taskWithCorruptedChangelogs); } catch (final KafkaException e) { throw new StreamsException("Restore consumer get unexpected error polling records.", e); } @@ -428,7 +442,6 @@ public class StoreChangelogReader implements ChangelogReader { restoreChangelog(changelogs.get(partition)); } - maybeUpdateLimitOffsetsForStandbyChangelogs(); } } @@ -681,6 +694,8 @@ public class StoreChangelogReader implements ChangelogReader { } assignment.addAll(partitions); restoreConsumer.assign(assignment); + + log.debug("Added partitions {} to the restore consumer, current assignment is {}", partitions, assignment); } private void pauseChangelogsFromRestoreConsumer(final Collection partitions) { @@ -692,6 +707,8 @@ public class StoreChangelogReader implements ChangelogReader { "does not contain some of the partitions " + partitions + " for pausing."); } restoreConsumer.pause(partitions); + + log.debug("Paused partitions {} from the restore consumer", partitions); } private void removeChangelogsFromRestoreConsumer(final Collection partitions) { @@ -715,6 +732,8 @@ public class StoreChangelogReader implements ChangelogReader { "does not contain some of the partitions " + partitions + " for resuming."); } restoreConsumer.resume(partitions); + + log.debug("Resumed partitions {} from the restore consumer", partitions); } private void prepareChangelogs(final Set newPartitionsToRestore) { @@ -761,6 +780,8 @@ public class StoreChangelogReader implements ChangelogReader { } catch (final TimeoutException e) { // if we cannot find the starting position at the beginning, just use the default 0L } catch (final KafkaException e) { + // this also includes InvalidOffsetException, which should not happen under normal + // execution, hence it is also okay to wrap it as fatal StreamsException throw new StreamsException("Restore consumer get unexpected error trying to get the position " + " of " + partition, e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 98f8537eeaf..f50ab008c0d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -49,7 +49,6 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.nio.ByteBuffer; import java.util.Base64; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -59,6 +58,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static java.lang.String.format; +import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; @@ -71,10 +71,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, // visible for testing static final byte LATEST_MAGIC_BYTE = 1; + private final Time time; private final Logger log; private final String logPrefix; - private final Time time; - private final String threadId; private final Consumer consumer; // we want to abstract eos logic out of StreamTask, however @@ -84,7 +83,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final long maxTaskIdleMs; private final int maxBufferedSize; - private final StreamsMetricsImpl streamsMetrics; private final PartitionGroup partitionGroup; private final RecordCollector recordCollector; private final PartitionGroup.RecordInfo recordInfo; @@ -123,11 +121,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, log = logContext.logger(getClass()); this.time = time; - this.streamsMetrics = streamsMetrics; this.recordCollector = recordCollector; eosDisabled = !StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); - threadId = Thread.currentThread().getName(); + final String threadId = Thread.currentThread().getName(); closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); final String taskId = id.toString(); if (streamsMetrics.version() == Version.FROM_0100_TO_24) { @@ -239,25 +236,32 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, if (state() == State.CREATED || state() == State.CLOSING || state() == State.SUSPENDED) { // do nothing log.trace("Skip suspending since state is {}", state()); + } else if (state() == State.RUNNING) { + closeTopology(true); + + commitState(); + // whenever we have successfully committed state during suspension, it is safe to checkpoint + // the state as well no matter if EOS is enabled or not + stateMgr.checkpoint(checkpointableOffsets()); + + // we should also clear any buffered records of a task when suspending it + partitionGroup.clear(); + + transitionTo(State.SUSPENDED); + log.info("Suspended running"); + } else if (state() == State.RESTORING) { + // we just checkpoint the position that we've restored up to without + // going through the commit process + stateMgr.flush(); + stateMgr.checkpoint(emptyMap()); + + // we should also clear any buffered records of a task when suspending it + partitionGroup.clear(); + + transitionTo(State.SUSPENDED); + log.info("Suspended restoring"); } else { - if (state() == State.RUNNING) { - closeTopology(true); - } - - if (state() == State.RUNNING || state() == State.RESTORING) { - commitState(); - // whenever we have successfully committed state during suspension, it is safe to checkpoint - // the state as well no matter if EOS is enabled or not - stateMgr.checkpoint(checkpointableOffsets()); - - // we should also clear any buffered records of a task when suspending it - partitionGroup.clear(); - - transitionTo(State.SUSPENDED); - log.info("Suspended active"); - } else { - throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id); - } + throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id); } } @@ -413,26 +417,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } else { if (state() == State.RUNNING) { closeTopology(clean); - } - if (state() == State.RUNNING || state() == State.RESTORING) { if (clean) { commitState(); // whenever we have successfully committed state, it is safe to checkpoint // the state as well no matter if EOS is enabled or not stateMgr.checkpoint(checkpointableOffsets()); - } else if (eosDisabled) { - // if from unclean close, then only need to flush state to make sure that when later - // closing the states, there's no records triggering any processing anymore; also swallow all caught exceptions - // However, for a _clean_ shutdown, we try to commit and checkpoint. If there are any exceptions, they become - // fatal for the "closeClean()" call, and the caller can try again with closeDirty() to complete the shutdown. - try { - stateMgr.flush(); - } catch (final RuntimeException error) { - log.debug("Ignoring flush error in unclean close.", error); - } + } else { + executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush"); } + transitionTo(State.CLOSING); + } else if (state() == State.RESTORING) { + executeAndMaybeSwallow(clean, () -> { + stateMgr.flush(); + stateMgr.checkpoint(Collections.emptyMap()); + }, "state manager flush and checkpoint"); + transitionTo(State.CLOSING); } else if (state() == State.SUSPENDED) { // do not need to commit / checkpoint, since when suspending we've already committed the state @@ -444,12 +445,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, // if the latter throws and we re-close dirty which would close the state manager again. StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory); - // if EOS is enabled, we wipe out the whole state store since they are invalid to use anymore - if (!eosDisabled) { + // if EOS is enabled, we wipe out the whole state store for unclean close + // since they are invalid to use anymore + if (!clean && !eosDisabled) { StateManagerUtil.wipeStateStores(log, stateMgr); } - closeRecordCollector(clean); + executeAndMaybeSwallow(clean, recordCollector::close, "record collector close"); } else { throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id); } @@ -457,7 +459,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, partitionGroup.close(); closeTaskSensor.record(); - streamsMetrics.removeAllTaskLevelSensors(threadId, id.toString()); transitionTo(State.CLOSED); } @@ -467,6 +468,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, * source topic partitions, or if it is enforced to be processable */ public boolean isProcessable(final long wallClockTime) { + if (state() == State.CLOSED || state() == State.CLOSING) { + // a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing; + // in either case we can just log it and move on without notifying the thread since the consumer + // would soon be updated to not return any records for this task anymore. + log.info("Stream task {} is already in {} state, skip processing it.", id(), state()); + + return false; + } + if (partitionGroup.allPartitionsBuffered()) { idleStartTime = RecordQueue.UNKNOWN; return true; @@ -700,12 +710,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } } - private void closeRecordCollector(final boolean clean) { + private void executeAndMaybeSwallow(final boolean clean, final Runnable runnable, final String name) { try { - recordCollector.close(); + runnable.run(); } catch (final RuntimeException e) { if (clean) { throw e; + } else { + log.debug("Ignoring error in unclean {}", name); } } } @@ -719,12 +731,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, */ @Override public void addRecords(final TopicPartition partition, final Iterable> records) { - if (state() == State.CLOSED || state() == State.CLOSING) { - log.info("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " + - "Notifying the thread to trigger a new rebalance immediately.", id()); - throw new TaskMigratedException(id()); - } - final int newQueueSize = partitionGroup.addRawRecords(partition, records); if (log.isTraceEnabled()) { @@ -918,11 +924,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, return commitNeeded; } - @Override - public Collection changelogPartitions() { - return stateMgr.changelogPartitions(); - } - @Override public Map changelogOffsets() { if (state() == State.RUNNING) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 0ea495252a8..0df38fdbf35 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; @@ -262,7 +263,6 @@ public class StreamThread extends Thread { final Time time; final Logger log; - AbstractTaskCreator(final InternalTopologyBuilder builder, final StreamsConfig config, final StreamsMetricsImpl streamsMetrics, @@ -574,6 +574,7 @@ public class StreamThread extends Thread { changelogReader, processId, logPrefix, + streamsMetrics, activeTaskCreator, standbyTaskCreator, builder, @@ -720,14 +721,11 @@ public class StreamThread extends Thread { try { runLoop(); cleanRun = true; - } catch (final KafkaException e) { - log.error("Encountered the following unexpected Kafka exception during processing, " + - "this usually indicate Streams internal errors:", e); - throw e; } catch (final Exception e) { // we have caught all Kafka related exceptions, and other runtime exceptions // should be due to user application errors - log.error("Encountered the following error during processing:", e); + log.error("Encountered the following exception during processing " + + "and the thread is going to shut down: ", e); throw e; } finally { completeShutdown(cleanRun); @@ -747,15 +745,22 @@ public class StreamThread extends Thread { try { runOnce(); if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) { - log.info("Version probing detected. Triggering new rebalance."); + log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance."); + assignmentErrorCode.set(AssignorError.NONE.code()); enforceRebalance(); } - } catch (final TaskMigratedException ignoreAndRejoinGroup) { - log.warn("Detected task {} that got migrated to another thread. " + - "This implies that this thread missed a rebalance and dropped out of the consumer group. " + - "Will try to rejoin the consumer group.", ignoreAndRejoinGroup.migratedTaskId()); + } catch (final TaskCorruptedException e) { + log.warn("Detected the states of tasks {} are corrupted. " + + "Will close the task as dirty and re-create and bootstrap from scratch.", e.corruptedTaskWithChangelogs()); + taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); + } catch (final TaskMigratedException e) { + log.warn("Detected that the thread is being fenced. " + + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + + "Will close out all assigned tasks and rejoin the consumer group."); + + taskManager.handleLostAll(); enforceRebalance(); } } @@ -849,7 +854,7 @@ public class StreamThread extends Thread { * 5. If one of the above happens, half the value of N. */ int processed = 0; - long timeSinceLastPoll = 0L; + long timeSinceLastPoll; do { for (int i = 0; i < numIterations; i++) { @@ -946,7 +951,7 @@ public class StreamThread extends Thread { if (originalReset.equals("earliest")) { addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics); - } else if (originalReset.equals("latest")) { + } else { addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics); } } @@ -974,16 +979,12 @@ public class StreamThread extends Thread { * @param records Records, can be null */ private void addRecordsToTasks(final ConsumerRecords records) { - for (final TopicPartition partition : records.partitions()) { final Task task = taskManager.taskForInputPartition(partition); if (task == null) { - log.error( - "Unable to locate active task for received-record partition {}. Current tasks: {}", - partition, - taskManager.toString(">") - ); + log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", + partition, taskManager.toString(">")); throw new NullPointerException("Task was unexpectedly missing for partition " + partition); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index e91b885f930..18d12bffb5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -39,30 +39,30 @@ public interface Task { * *
      *                 +-------------+
-     *          +<---- | Created (0) |
-     *          |      +-----+-------+
-     *          |            |
-     *          |            v
-     *          |      +-----+-------+
-     *          +<---- | Restoring(1)|<---------------+
-     *          |      +-----+-------+                |
-     *          |            |                        |
-     *          |            +--------------------+   |
-     *          |            |                    |   |
-     *          |            v                    v   |
-     *          |      +-----+-------+       +----+---+----+
-     *          |      | Running (2) | ----> | Suspended(3)|   * //TODO Suspended(3) could be removed after we've stable on KIP-429
-     *          |      +-----+-------+       +------+------+
-     *          |            |                      |
-     *          |            |                      |
-     *          |            v                      |
-     *          |      +-----+-------+              |
-     *          +----> | Closing (4) | <------------+
-     *                 +-----+-------+
-     *                       |
-     *                       v
-     *                 +-----+-------+
-     *                 | Closed (5)  |
+     *          +<---- | Created (0) | <----------------------+
+     *          |      +-----+-------+                        |
+     *          |            |                                |
+     *          |            v                                |
+     *          |      +-----+-------+                        |
+     *          +<---- | Restoring(1)|<---------------+       |
+     *          |      +-----+-------+                |       |
+     *          |            |                        |       |
+     *          |            +--------------------+   |       |
+     *          |            |                    |   |       |
+     *          |            v                    v   |       |
+     *          |      +-----+-------+       +----+---+----+  |
+     *          |      | Running (2) | ----> | Suspended(3)|  |    * //TODO Suspended(3) could be removed after we've stable on KIP-429
+     *          |      +-----+-------+       +------+------+  |
+     *          |            |                      |         |
+     *          |            |                      |         |
+     *          |            v                      |         |
+     *          |      +-----+-------+              |         |
+     *          +----> | Closing (4) | <------------+         |
+     *                 +-----+-------+                        |
+     *                       |                                |
+     *                       v                                |
+     *                 +-----+-------+                        |
+     *                 | Closed (5)  | -----------------------+
      *                 +-------------+
      * 
*/ @@ -72,7 +72,7 @@ public interface Task { RUNNING(3, 4), // 2 SUSPENDED(1, 4), // 3 CLOSING(4, 5), // 4, we allow CLOSING to transit to itself to make close idempotent - CLOSED; // 5 + CLOSED(0); // 5, we allow CLOSED to transit to CREATED to handle corrupted tasks private final Set validTransitions = new HashSet<>(); @@ -154,6 +154,11 @@ public interface Task { */ void closeDirty(); + /** + * Revive a closed task to a created one; should never throw an exception + */ + void revive(); + StateStore getStore(final String name); Set inputPartitions(); @@ -169,6 +174,8 @@ public interface Task { */ Map changelogOffsets(); + void markChangelogAsCorrupted(final Set partitions); + default Map purgableOffsets() { return Collections.emptyMap(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 0edd7f729b2..48747e931ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskIdFormatException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; import java.io.File; @@ -56,40 +57,42 @@ public class TaskManager { // by QueryableState private final Logger log; private final UUID processId; - private final ChangelogReader changelogReader; private final String logPrefix; - private final StreamThread.AbstractTaskCreator taskCreator; + private final InternalTopologyBuilder builder; + private final ChangelogReader changelogReader; + private final StreamsMetricsImpl streamsMetrics; + private final StreamThread.AbstractTaskCreator activeTaskCreator; private final StreamThread.AbstractTaskCreator standbyTaskCreator; - private final Admin adminClient; - private DeleteRecordsResult deleteRecordsResult; - private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit - private final Map tasks = new TreeMap<>(); // materializing this relationship because the lookup is on the hot path private final Map partitionToTask = new HashMap<>(); + private final Admin adminClient; private Consumer consumer; - private final InternalTopologyBuilder builder; + private DeleteRecordsResult deleteRecordsResult; + + private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit TaskManager(final ChangelogReader changelogReader, final UUID processId, final String logPrefix, - final StreamThread.AbstractTaskCreator taskCreator, + final StreamsMetricsImpl streamsMetrics, + final StreamThread.AbstractTaskCreator activeTaskCreator, final StreamThread.AbstractTaskCreator standbyTaskCreator, final InternalTopologyBuilder builder, final Admin adminClient) { - this.changelogReader = changelogReader; + this.builder = builder; this.processId = processId; this.logPrefix = logPrefix; - this.taskCreator = taskCreator; + this.adminClient = adminClient; + this.streamsMetrics = streamsMetrics; + this.changelogReader = changelogReader; + this.activeTaskCreator = activeTaskCreator; this.standbyTaskCreator = standbyTaskCreator; - this.builder = builder; - final LogContext logContext = new LogContext(logPrefix); - - log = logContext.logger(getClass()); - this.adminClient = adminClient; + final LogContext logContext = new LogContext(logPrefix); + this.log = logContext.logger(getClass()); } void setConsumer(final Consumer consumer) { @@ -118,6 +121,29 @@ public class TaskManager { rebalanceInProgress = false; } + void handleCorruption(final Map> taskWithChangelogs) { + for (final Map.Entry> entry : taskWithChangelogs.entrySet()) { + final TaskId taskId = entry.getKey(); + final Task task = tasks.get(taskId); + + // this call is idempotent so even if the task is only CREATED we can still call it + changelogReader.remove(task.changelogPartitions()); + + // mark corrupted partitions to not be checkpointed, and then close the task as dirty + final Set corruptedPartitions = entry.getValue(); + task.markChangelogAsCorrupted(corruptedPartitions); + + try { + task.closeClean(); + } catch (final RuntimeException e) { + log.error("Failed to close task {} cleanly while handling corrupted tasks. Attempting to re-close it as dirty.", task.id()); + task.closeDirty(); + } + + task.revive(); + } + } + /** * @throws TaskMigratedException if the task producer got fenced (EOS only) * @throws StreamsException fatal error while creating / initializing the task @@ -148,9 +174,9 @@ public class TaskManager { task.resume(); standbyTasksToCreate.remove(task.id()); } else /* we previously owned this task, and we don't have it anymore, or it has changed active/standby state */ { - final Set inputPartitions = task.inputPartitions(); + cleanupTask(task); + try { - changelogReader.remove(task.changelogPartitions()); task.closeClean(); } catch (final RuntimeException e) { log.error(String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id()), e); @@ -159,9 +185,7 @@ public class TaskManager { // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. task.closeDirty(); } - for (final TopicPartition inputPartition : inputPartitions) { - partitionToTask.remove(inputPartition); - } + iterator.remove(); } } @@ -176,7 +200,7 @@ public class TaskManager { } if (!activeTasksToCreate.isEmpty()) { - taskCreator.createTasks(consumer, activeTasksToCreate).forEach(this::addNewTask); + activeTaskCreator.createTasks(consumer, activeTasksToCreate).forEach(this::addNewTask); } if (!standbyTasksToCreate.isEmpty()) { @@ -293,18 +317,13 @@ public class TaskManager { final Iterator iterator = tasks.values().iterator(); while (iterator.hasNext()) { final Task task = iterator.next(); - final Set inputPartitions = task.inputPartitions(); // Even though we've apparently dropped out of the group, we can continue safely to maintain our // standby tasks while we rejoin. if (task.isActive()) { + cleanupTask(task); task.closeDirty(); - changelogReader.remove(task.changelogPartitions()); - } - - for (final TopicPartition inputPartition : inputPartitions) { - partitionToTask.remove(inputPartition); + iterator.remove(); } - iterator.remove(); } } @@ -320,7 +339,7 @@ public class TaskManager { final Set locallyStoredTasks = new HashSet<>(); - final File[] stateDirs = taskCreator.stateDirectory().listTaskDirectories(); + final File[] stateDirs = activeTaskCreator.stateDirectory().listTaskDirectories(); if (stateDirs != null) { for (final File dir : stateDirs) { try { @@ -339,12 +358,27 @@ public class TaskManager { return locallyStoredTasks; } + private void cleanupTask(final Task task) { + // 1. remove the changelog partitions from changelog reader; + // 2. remove the input partitions from the materialized map; + // 3. remove the task metrics from the metrics registry + changelogReader.remove(task.changelogPartitions()); + + for (final TopicPartition inputPartition : task.inputPartitions()) { + partitionToTask.remove(inputPartition); + } + + final String threadId = Thread.currentThread().getName(); + streamsMetrics.removeAllTaskLevelSensors(threadId, task.id().toString()); + } + void shutdown(final boolean clean) { final AtomicReference firstException = new AtomicReference<>(null); final Iterator iterator = tasks.values().iterator(); while (iterator.hasNext()) { final Task task = iterator.next(); - final Set inputPartitions = task.inputPartitions(); + cleanupTask(task); + if (clean) { try { task.closeClean(); @@ -358,16 +392,10 @@ public class TaskManager { } else { task.closeDirty(); } - changelogReader.remove(task.changelogPartitions()); - - for (final TopicPartition inputPartition : inputPartitions) { - partitionToTask.remove(inputPartition); - } - iterator.remove(); } - taskCreator.close(); + activeTaskCreator.close(); final RuntimeException fatalException = firstException.get(); if (fatalException != null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 9909e9c0e99..0a4315b6b75 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1321,6 +1321,46 @@ public class StreamTaskTest { verify(stateManager); } + @Test + public void shouldNotCommitOnSuspendRestoring() { + stateManager.flush(); + EasyMock.expectLastCall(); + stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); + EasyMock.expectLastCall(); + recordCollector.commit(EasyMock.anyObject()); + EasyMock.expectLastCall().andThrow(new AssertionError("Should not call this function")).anyTimes(); + EasyMock.replay(stateManager); + + task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); + + task.initializeIfNeeded(); + task.suspend(); + + assertEquals(Task.State.SUSPENDED, task.state()); + + verify(stateManager); + } + + @Test + public void shouldNotCommitOnCloseRestoring() { + stateManager.flush(); + EasyMock.expectLastCall(); + stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); + EasyMock.expectLastCall(); + recordCollector.commit(EasyMock.anyObject()); + EasyMock.expectLastCall().andThrow(new AssertionError("Should not call this function")).anyTimes(); + EasyMock.replay(stateManager); + + task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); + + task.initializeIfNeeded(); + task.closeClean(); + + assertEquals(Task.State.CLOSED, task.state()); + + verify(stateManager); + } + @Test public void shouldCommitOnCloseClean() { final long offset = 543L; @@ -1339,6 +1379,7 @@ public class StreamTaskTest { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); task.initializeIfNeeded(); + task.completeRestoration(); task.closeClean(); assertEquals(Task.State.CLOSED, task.state()); @@ -1365,6 +1406,7 @@ public class StreamTaskTest { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); task.initializeIfNeeded(); + task.completeRestoration(); assertThrows(ProcessorStateException.class, task::closeClean); @@ -1418,7 +1460,7 @@ public class StreamTaskTest { EasyMock.expectLastCall(); stateManager.flush(); EasyMock.expectLastCall(); - stateManager.checkpoint(Collections.singletonMap(changelogPartition, offset)); + stateManager.checkpoint(Collections.emptyMap()); EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); stateManager.close(); EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 6fa70aa4e0c..2570d4a3d86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -73,7 +73,6 @@ import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -1364,12 +1363,11 @@ public class StreamThreadTest { assertTrue(metadata.standbyTasks().isEmpty()); } - @Ignore @Test - // FIXME: should unblock this test after we added invalid offset handling public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception { internalStreamsBuilder.stream(Collections.singleton("topic"), consumed) - .groupByKey().count(Materialized.as("count")); + .groupByKey() + .count(Materialized.as("count")); internalStreamsBuilder.buildAndOptimizeTopology(); final StreamThread thread = createStreamThread("clientId", config, false); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index dfd29c63fd3..575e98ad851 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -25,9 +25,12 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; @@ -115,9 +118,11 @@ public class TaskManagerTest { @Before public void setUp() { + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST); taskManager = new TaskManager(changeLogReader, UUID.randomUUID(), - "", + "taskManagerTest", + streamsMetrics, activeTaskCreator, standbyTaskCreator, topologyBuilder,